You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/11/23 00:14:42 UTC

[accumulo] branch master updated: Improve Upgrader9to10 code (#1441)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new b0bbff0  Improve Upgrader9to10 code (#1441)
b0bbff0 is described below

commit b0bbff007d372fb3a9e3c4c4f900e063a65a40b1
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Nov 22 19:14:35 2019 -0500

    Improve Upgrader9to10 code (#1441)
    
    This changes the batching strategy for the Upgrader9to10 class.
    
    The previous strategy was to fill a data structure until JVM memory was
    sufficiently used up (50% of heap size), then process the upgrade for
    those entries, and repeat until no more entries are left to upgrade.
    
    This new strategy batches candidates for upgrade in batches of size 4
    million characters (approx. 8MB batches), regardless of memory.
    
    This stabilizes the testing, as it is much simpler to reproduce and test
    fixed-size batches, than it is to manipulate the JVM heap size during
    testing. As a result, many improvements to the GCUpgrade9to10TestIT were
    made here. This also fixes that IT, which began failing with the use of
    the G1GC instead of CMS (#1427), because G1GC didn't work well with a
    master configured with a 16MB JVM heap and crashed frequently OOMEs.
---
 .../accumulo/master/upgrade/Upgrader9to10.java     |  39 +++----
 .../test/upgrade/GCUpgrade9to10TestIT.java         | 115 +++++++++++----------
 2 files changed, 75 insertions(+), 79 deletions(-)

diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
index 89e7454..2f0d7d3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
@@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
 import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES;
 import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
-import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
 
 import java.io.IOException;
@@ -34,7 +33,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.Constants;
@@ -102,11 +100,8 @@ public class Upgrader9to10 implements Upgrader {
   public static final Value UPGRADED = MetadataSchema.DeletesSection.SkewedKeyValue.NAME;
   public static final String OLD_DELETE_PREFIX = "~del";
 
-  /**
-   * This percentage was taken from the SimpleGarbageCollector and if nothing else is going on
-   * during upgrade then it could be larger.
-   */
-  static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;
+  // effectively an 8MB batch size, since this number is the number of Chars
+  public static final long CANDIDATE_BATCH_SIZE = 4_000_000;
 
   @Override
   public void upgradeZookeeper(ServerContext ctx) {
@@ -405,11 +400,8 @@ public class Upgrader9to10 implements Upgrader {
     try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) {
       log.info("looking for candidates in table {}", tableName);
       Iterator<String> oldCandidates = getOldCandidates(ctx, tableName);
-      int t = 0; // no waiting first time through
       while (oldCandidates.hasNext()) {
-        // give it some time for memory to clean itself up if needed
-        sleepUninterruptibly(t, TimeUnit.SECONDS);
-        List<String> deletes = readCandidatesThatFitInMemory(oldCandidates);
+        List<String> deletes = readCandidatesInBatch(oldCandidates);
         log.info("found {} deletes to upgrade", deletes.size());
         for (String olddelete : deletes) {
           // create new formatted delete
@@ -428,7 +420,6 @@ public class Upgrader9to10 implements Upgrader {
           writer.addMutation(deleteOldDeleteMutation(olddelete));
         }
         writer.flush();
-        t = 3;
       }
     } catch (TableNotFoundException | MutationsRejectedException e) {
       throw new RuntimeException(e);
@@ -466,14 +457,18 @@ public class Upgrader9to10 implements Upgrader {
         .iterator();
   }
 
-  private List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) {
+  private List<String> readCandidatesInBatch(Iterator<String> candidates) {
+    long candidateLength = 0;
     List<String> result = new ArrayList<>();
-    // Always read at least one. If memory doesn't clean up fast enough at least
-    // some progress is made.
     while (candidates.hasNext()) {
-      result.add(candidates.next());
-      if (almostOutOfMemory(Runtime.getRuntime()))
+      String candidate = candidates.next();
+      candidateLength += candidate.length();
+      result.add(candidate);
+      if (candidateLength > CANDIDATE_BATCH_SIZE) {
+        log.trace("List of delete candidates has exceeded the batch size"
+            + " threshold. Attempting to delete what has been gathered so far.");
         break;
+      }
     }
     return result;
   }
@@ -484,16 +479,6 @@ public class Upgrader9to10 implements Upgrader {
     return m;
   }
 
-  private boolean almostOutOfMemory(Runtime runtime) {
-    if (runtime.totalMemory() - runtime.freeMemory()
-        > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) {
-      log.info("List of delete candidates has exceeded the memory"
-          + " threshold. Attempting to delete what has been gathered so far.");
-      return true;
-    } else
-      return false;
-  }
-
   public void upgradeDirColumns(ServerContext ctx, Ample.DataLevel level) {
     String tableName = level.metaTable();
     AccumuloClient c = ctx;
diff --git a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
index 86adfa1..26a8f3a 100644
--- a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test.upgrade;
 
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
@@ -32,7 +35,6 @@ import org.apache.accumulo.core.client.Accumulo;
 import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
@@ -44,20 +46,18 @@ import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.master.upgrade.Upgrader9to10;
-import org.apache.accumulo.minicluster.MemoryUnit;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
 import org.apache.accumulo.server.gc.GcVolumeUtil;
 import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.junit.Test;
 
-import com.google.common.collect.Iterators;
-
 public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
   private static final String OUR_SECRET = "itsreallysecret";
   private static final String OLDDELPREFIX = "~del";
@@ -72,14 +72,7 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
   public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
     cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
     cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
-    cfg.setDefaultMemory(64, MemoryUnit.MEGABYTE);
-    cfg.setMemory(ServerType.MASTER, 16, MemoryUnit.MEGABYTE);
-    cfg.setMemory(ServerType.ZOOKEEPER, 32, MemoryUnit.MEGABYTE);
-    cfg.setProperty(Property.GC_CYCLE_START, "1");
-    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
-    cfg.setProperty(Property.GC_PORT, "0");
-    cfg.setProperty(Property.TSERV_MAXMEM, "5K");
-    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+    cfg.setProperty(Property.GC_CYCLE_START, "1000"); // gc will be killed before it is run
 
     // use raw local file system so walogs sync and flush will work
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -118,38 +111,65 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
   }
 
   /**
-   * This is really hard to make happen - the minicluster can only use so little memory to start up.
-   * The {@link org.apache.accumulo.master.upgrade.Upgrader9to10} CANDIDATE_MEMORY_PERCENTAGE can be
-   * adjusted.
+   * Ensure that the size of the candidates exceeds the {@link Upgrader9to10}'s CANDIDATE_BATCH_SIZE
+   * and will clean up candidates in multiple batches, without running out of memory.
    */
   @Test
   public void gcUpgradeOutofMemoryTest() throws Exception {
     killMacGc(); // we do not want anything deleted
 
-    int somebignumber = 100000;
-    String longpathname = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
-        + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"
-        + "kkkkkkkkkkkkkkkkkklllllllllllllllllllllmmmmmmmmmmmmmmmmmnnnnnnnnnnnnnnnn";
-    longpathname += longpathname; // make it even longer
+    int numberOfEntries = 100_000;
+    String longpathname = StringUtils.repeat("abcde", 100);
+    assertEquals(500, longpathname.length());
+
+    // sanity check to ensure that any batch size assumptions are still valid in this test
+    assertEquals(4_000_000, Upgrader9to10.CANDIDATE_BATCH_SIZE);
+
+    // ensure test quality by making sure we have enough candidates to
+    // exceed the batch size at least ten times
+    long numBatches = numberOfEntries * longpathname.length() / Upgrader9to10.CANDIDATE_BATCH_SIZE;
+    assertTrue("Expected numBatches between 10 and 15, but was " + numBatches,
+        numBatches > 10 && numBatches < 15);
+
     Ample.DataLevel level = Ample.DataLevel.USER;
 
     log.info("Filling metadata table with lots of bogus delete flags");
     try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
-      addEntries(c, level.metaTable(), somebignumber, longpathname);
+      Map<String,String> expected = addEntries(c, level.metaTable(), numberOfEntries, longpathname);
+      assertEquals(numberOfEntries + numberOfEntries / 10, expected.size());
+
+      Range range = MetadataSchema.DeletesSection.getRange();
 
       sleepUninterruptibly(1, TimeUnit.SECONDS);
+      try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+        Map<String,String> actualOldStyle = new HashMap<>();
+        scanner.setRange(range);
+        scanner.forEach(entry -> {
+          String strKey = entry.getKey().getRow().toString();
+          String strValue = entry.getValue().toString();
+          actualOldStyle.put(strKey, strValue);
+        });
+        assertEquals(expected.size(), actualOldStyle.size());
+        assertTrue(Collections.disjoint(expected.keySet(), actualOldStyle.keySet()));
+      }
+
       upgrader.upgradeFileDeletes(getServerContext(), level);
 
       sleepUninterruptibly(1, TimeUnit.SECONDS);
-      Range range = MetadataSchema.DeletesSection.getRange();
-      Scanner scanner;
-      try {
-        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
-      } catch (TableNotFoundException e) {
-        throw new RuntimeException(e);
+      try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+        Map<String,String> actualNewStyle = new HashMap<>();
+        scanner.setRange(range);
+        scanner.forEach(entry -> {
+          String strKey = entry.getKey().getRow().toString();
+          String expectedValue = expected.get(strKey);
+          assertNotNull(expectedValue);
+          String strValue = entry.getValue().toString();
+          assertEquals(expectedValue, strValue);
+          actualNewStyle.put(strKey, strValue);
+        });
+        assertEquals(expected.size(), actualNewStyle.size());
+        assertEquals(expected, actualNewStyle);
       }
-      scanner.setRange(range);
-      assertEquals(somebignumber + somebignumber / 10, Iterators.size(scanner.iterator()));
     }
   }
 
@@ -160,41 +180,32 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
     try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
 
       Map<String,String> expected = addEntries(c, level.metaTable(), count, "somefile");
-      Map<String,String> actual = new HashMap<>();
 
       sleepUninterruptibly(1, TimeUnit.SECONDS);
       upgrader.upgradeFileDeletes(getServerContext(), level);
       sleepUninterruptibly(1, TimeUnit.SECONDS);
       Range range = MetadataSchema.DeletesSection.getRange();
 
-      Scanner scanner;
-      try {
-        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
-      } catch (TableNotFoundException e) {
-        throw new RuntimeException(e);
+      try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+        Map<String,String> actual = new HashMap<>();
+        scanner.setRange(range);
+        scanner.forEach(entry -> {
+          actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+        });
+        assertEquals(expected, actual);
       }
-      scanner.setRange(range);
-      scanner.iterator().forEachRemaining(entry -> {
-        actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
-      });
-
-      assertEquals(expected, actual);
 
       // ENSURE IDEMPOTENCE - run upgrade again to ensure nothing is changed because there is
       // nothing to change
       upgrader.upgradeFileDeletes(getServerContext(), level);
-      try {
-        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
-      } catch (TableNotFoundException e) {
-        throw new RuntimeException(e);
+      try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+        Map<String,String> actual = new HashMap<>();
+        scanner.setRange(range);
+        scanner.forEach(entry -> {
+          actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+        });
+        assertEquals(expected, actual);
       }
-      scanner.setRange(range);
-      actual.clear();
-      scanner.iterator().forEachRemaining(entry -> {
-        actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
-      });
-
-      assertEquals(expected, actual);
     }
   }