You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2019/11/23 00:14:42 UTC
[accumulo] branch master updated: Improve Upgrader9to10 code (#1441)
This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new b0bbff0 Improve Upgrader9to10 code (#1441)
b0bbff0 is described below
commit b0bbff007d372fb3a9e3c4c4f900e063a65a40b1
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Fri Nov 22 19:14:35 2019 -0500
Improve Upgrader9to10 code (#1441)
This changes the batching strategy for the Upgrader9to10 class.
The previous strategy was to fill a data structure until JVM memory was
sufficiently used up (50% of heap size), then process the upgrade for
those entries, and repeat until no more entries are left to upgrade.
This new strategy batches candidates for upgrade in batches of size 4
million characters (approx. 8MB batches), regardless of memory.
This stabilizes the testing, as it is much simpler to reproduce and test
fixed-size batches, than it is to manipulate the JVM heap size during
testing. As a result, many improvements to the GCUpgrade9to10TestIT were
made here. This also fixes that IT, which began failing with the use of
the G1GC instead of CMS (#1427), because G1GC didn't work well with a
master configured with a 16MB JVM heap and crashed frequently OOMEs.
---
.../accumulo/master/upgrade/Upgrader9to10.java | 39 +++----
.../test/upgrade/GCUpgrade9to10TestIT.java | 115 +++++++++++----------
2 files changed, 75 insertions(+), 79 deletions(-)
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
index 89e7454..2f0d7d3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
@@ -22,7 +22,6 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES;
import static org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.DIRECTORY_COLUMN;
-import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
import java.io.IOException;
@@ -34,7 +33,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.apache.accumulo.core.Constants;
@@ -102,11 +100,8 @@ public class Upgrader9to10 implements Upgrader {
public static final Value UPGRADED = MetadataSchema.DeletesSection.SkewedKeyValue.NAME;
public static final String OLD_DELETE_PREFIX = "~del";
- /**
- * This percentage was taken from the SimpleGarbageCollector and if nothing else is going on
- * during upgrade then it could be larger.
- */
- static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;
+ // effectively an 8MB batch size, since this number is the number of Chars
+ public static final long CANDIDATE_BATCH_SIZE = 4_000_000;
@Override
public void upgradeZookeeper(ServerContext ctx) {
@@ -405,11 +400,8 @@ public class Upgrader9to10 implements Upgrader {
try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) {
log.info("looking for candidates in table {}", tableName);
Iterator<String> oldCandidates = getOldCandidates(ctx, tableName);
- int t = 0; // no waiting first time through
while (oldCandidates.hasNext()) {
- // give it some time for memory to clean itself up if needed
- sleepUninterruptibly(t, TimeUnit.SECONDS);
- List<String> deletes = readCandidatesThatFitInMemory(oldCandidates);
+ List<String> deletes = readCandidatesInBatch(oldCandidates);
log.info("found {} deletes to upgrade", deletes.size());
for (String olddelete : deletes) {
// create new formatted delete
@@ -428,7 +420,6 @@ public class Upgrader9to10 implements Upgrader {
writer.addMutation(deleteOldDeleteMutation(olddelete));
}
writer.flush();
- t = 3;
}
} catch (TableNotFoundException | MutationsRejectedException e) {
throw new RuntimeException(e);
@@ -466,14 +457,18 @@ public class Upgrader9to10 implements Upgrader {
.iterator();
}
- private List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) {
+ private List<String> readCandidatesInBatch(Iterator<String> candidates) {
+ long candidateLength = 0;
List<String> result = new ArrayList<>();
- // Always read at least one. If memory doesn't clean up fast enough at least
- // some progress is made.
while (candidates.hasNext()) {
- result.add(candidates.next());
- if (almostOutOfMemory(Runtime.getRuntime()))
+ String candidate = candidates.next();
+ candidateLength += candidate.length();
+ result.add(candidate);
+ if (candidateLength > CANDIDATE_BATCH_SIZE) {
+ log.trace("List of delete candidates has exceeded the batch size"
+ + " threshold. Attempting to delete what has been gathered so far.");
break;
+ }
}
return result;
}
@@ -484,16 +479,6 @@ public class Upgrader9to10 implements Upgrader {
return m;
}
- private boolean almostOutOfMemory(Runtime runtime) {
- if (runtime.totalMemory() - runtime.freeMemory()
- > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) {
- log.info("List of delete candidates has exceeded the memory"
- + " threshold. Attempting to delete what has been gathered so far.");
- return true;
- } else
- return false;
- }
-
public void upgradeDirColumns(ServerContext ctx, Ample.DataLevel level) {
String tableName = level.metaTable();
AccumuloClient c = ctx;
diff --git a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
index 86adfa1..26a8f3a 100644
--- a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
@@ -20,8 +20,11 @@ package org.apache.accumulo.test.upgrade;
import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
@@ -32,7 +35,6 @@ import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
-import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
@@ -44,20 +46,18 @@ import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.fate.zookeeper.ZooLock;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.master.upgrade.Upgrader9to10;
-import org.apache.accumulo.minicluster.MemoryUnit;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.server.gc.GcVolumeUtil;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
-import com.google.common.collect.Iterators;
-
public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
private static final String OUR_SECRET = "itsreallysecret";
private static final String OLDDELPREFIX = "~del";
@@ -72,14 +72,7 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
- cfg.setDefaultMemory(64, MemoryUnit.MEGABYTE);
- cfg.setMemory(ServerType.MASTER, 16, MemoryUnit.MEGABYTE);
- cfg.setMemory(ServerType.ZOOKEEPER, 32, MemoryUnit.MEGABYTE);
- cfg.setProperty(Property.GC_CYCLE_START, "1");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
- cfg.setProperty(Property.GC_PORT, "0");
- cfg.setProperty(Property.TSERV_MAXMEM, "5K");
- cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+ cfg.setProperty(Property.GC_CYCLE_START, "1000"); // gc will be killed before it is run
// use raw local file system so walogs sync and flush will work
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
@@ -118,38 +111,65 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
}
/**
- * This is really hard to make happen - the minicluster can only use so little memory to start up.
- * The {@link org.apache.accumulo.master.upgrade.Upgrader9to10} CANDIDATE_MEMORY_PERCENTAGE can be
- * adjusted.
+ * Ensure that the size of the candidates exceeds the {@link Upgrader9to10}'s CANDIDATE_BATCH_SIZE
+ * and will clean up candidates in multiple batches, without running out of memory.
*/
@Test
public void gcUpgradeOutofMemoryTest() throws Exception {
killMacGc(); // we do not want anything deleted
- int somebignumber = 100000;
- String longpathname = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
- + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"
- + "kkkkkkkkkkkkkkkkkklllllllllllllllllllllmmmmmmmmmmmmmmmmmnnnnnnnnnnnnnnnn";
- longpathname += longpathname; // make it even longer
+ int numberOfEntries = 100_000;
+ String longpathname = StringUtils.repeat("abcde", 100);
+ assertEquals(500, longpathname.length());
+
+ // sanity check to ensure that any batch size assumptions are still valid in this test
+ assertEquals(4_000_000, Upgrader9to10.CANDIDATE_BATCH_SIZE);
+
+ // ensure test quality by making sure we have enough candidates to
+ // exceed the batch size at least ten times
+ long numBatches = numberOfEntries * longpathname.length() / Upgrader9to10.CANDIDATE_BATCH_SIZE;
+ assertTrue("Expected numBatches between 10 and 15, but was " + numBatches,
+ numBatches > 10 && numBatches < 15);
+
Ample.DataLevel level = Ample.DataLevel.USER;
log.info("Filling metadata table with lots of bogus delete flags");
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
- addEntries(c, level.metaTable(), somebignumber, longpathname);
+ Map<String,String> expected = addEntries(c, level.metaTable(), numberOfEntries, longpathname);
+ assertEquals(numberOfEntries + numberOfEntries / 10, expected.size());
+
+ Range range = MetadataSchema.DeletesSection.getRange();
sleepUninterruptibly(1, TimeUnit.SECONDS);
+ try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+ Map<String,String> actualOldStyle = new HashMap<>();
+ scanner.setRange(range);
+ scanner.forEach(entry -> {
+ String strKey = entry.getKey().getRow().toString();
+ String strValue = entry.getValue().toString();
+ actualOldStyle.put(strKey, strValue);
+ });
+ assertEquals(expected.size(), actualOldStyle.size());
+ assertTrue(Collections.disjoint(expected.keySet(), actualOldStyle.keySet()));
+ }
+
upgrader.upgradeFileDeletes(getServerContext(), level);
sleepUninterruptibly(1, TimeUnit.SECONDS);
- Range range = MetadataSchema.DeletesSection.getRange();
- Scanner scanner;
- try {
- scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
+ try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+ Map<String,String> actualNewStyle = new HashMap<>();
+ scanner.setRange(range);
+ scanner.forEach(entry -> {
+ String strKey = entry.getKey().getRow().toString();
+ String expectedValue = expected.get(strKey);
+ assertNotNull(expectedValue);
+ String strValue = entry.getValue().toString();
+ assertEquals(expectedValue, strValue);
+ actualNewStyle.put(strKey, strValue);
+ });
+ assertEquals(expected.size(), actualNewStyle.size());
+ assertEquals(expected, actualNewStyle);
}
- scanner.setRange(range);
- assertEquals(somebignumber + somebignumber / 10, Iterators.size(scanner.iterator()));
}
}
@@ -160,41 +180,32 @@ public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
Map<String,String> expected = addEntries(c, level.metaTable(), count, "somefile");
- Map<String,String> actual = new HashMap<>();
sleepUninterruptibly(1, TimeUnit.SECONDS);
upgrader.upgradeFileDeletes(getServerContext(), level);
sleepUninterruptibly(1, TimeUnit.SECONDS);
Range range = MetadataSchema.DeletesSection.getRange();
- Scanner scanner;
- try {
- scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
+ try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+ Map<String,String> actual = new HashMap<>();
+ scanner.setRange(range);
+ scanner.forEach(entry -> {
+ actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+ });
+ assertEquals(expected, actual);
}
- scanner.setRange(range);
- scanner.iterator().forEachRemaining(entry -> {
- actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
- });
-
- assertEquals(expected, actual);
// ENSURE IDEMPOTENCE - run upgrade again to ensure nothing is changed because there is
// nothing to change
upgrader.upgradeFileDeletes(getServerContext(), level);
- try {
- scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
- } catch (TableNotFoundException e) {
- throw new RuntimeException(e);
+ try (Scanner scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY)) {
+ Map<String,String> actual = new HashMap<>();
+ scanner.setRange(range);
+ scanner.forEach(entry -> {
+ actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+ });
+ assertEquals(expected, actual);
}
- scanner.setRange(range);
- actual.clear();
- scanner.iterator().forEachRemaining(entry -> {
- actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
- });
-
- assertEquals(expected, actual);
}
}