You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by hk...@apache.org on 2019/09/27 14:56:23 UTC
[accumulo] branch master updated: Fix #1365 2.1 Upgrade processing
for #1043 ~del (#1366)
This is an automated email from the ASF dual-hosted git repository.
hkeebler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/master by this push:
new 840fe43 Fix #1365 2.1 Upgrade processing for #1043 ~del (#1366)
840fe43 is described below
commit 840fe4352badb5256151b3ef3a947ecd7a2fa49b
Author: hkeebler <49...@users.noreply.github.com>
AuthorDate: Fri Sep 27 10:56:17 2019 -0400
Fix #1365 2.1 Upgrade processing for #1043 ~del (#1366)
* Fix #1365 2.1 Upgrade processing for #1043 ~del
---
.../accumulo/core/metadata/schema/Ample.java | 17 +-
.../core/metadata/schema/MetadataSchema.java | 8 +
.../accumulo/server/metadata/ServerAmpleImpl.java | 23 +--
.../master/upgrade/UpgradeCoordinator.java | 4 +-
.../accumulo/master/upgrade/Upgrader9to10.java | 101 ++++++++++
.../test/functional/GarbageCollectorIT.java | 10 +-
.../test/upgrade/GCUpgrade9to10TestIT.java | 219 +++++++++++++++++++++
7 files changed, 363 insertions(+), 19 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 9b8a692..61fbf06 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -65,12 +65,16 @@ public interface Ample {
* derived from the table id.
*/
public enum DataLevel {
- ROOT(null), METADATA(RootTable.NAME), USER(MetadataTable.NAME);
+ ROOT(null, null),
+ METADATA(RootTable.NAME, RootTable.ID),
+ USER(MetadataTable.NAME, MetadataTable.ID);
private final String table;
+ private final TableId id;
- private DataLevel(String table) {
+ private DataLevel(String table, TableId id) {
this.table = table;
+ this.id = id;
}
/**
@@ -81,6 +85,15 @@ public interface Ample {
throw new UnsupportedOperationException();
return table;
}
+
+ /**
+ * @return The Id of the Accumulo table in which this data level stores its metadata.
+ */
+ public TableId tableId() {
+ if (id == null)
+ throw new UnsupportedOperationException();
+ return id;
+ }
}
/**
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index b2fdede..1a88785 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -270,6 +270,14 @@ public class MetadataSchema {
return row.substring(encoded_prefix_length);
}
+ /**
+ * Value to indicate that the row has been skewed/encoded.
+ */
+ public static class SkewedKeyValue {
+ public static final String STR_NAME = "skewed";
+ public static final Value NAME = new Value(STR_NAME);
+ }
+
}
/**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index df59341..dc8af4f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.function.Consumer;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -35,13 +36,12 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.AmpleImpl;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.ServerContext;
@@ -51,7 +51,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
public class ServerAmpleImpl extends AmpleImpl implements Ample {
@@ -134,7 +133,7 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
for (String path : paths) {
- Mutation m = new Mutation(MetadataSchema.DeletesSection.encodeRow(path));
+ Mutation m = new Mutation(DeletesSection.encodeRow(path));
m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
writer.addMutation(m);
}
@@ -155,9 +154,9 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
return candidates.iterator();
} else if (level == DataLevel.METADATA || level == DataLevel.USER) {
- Range range = MetadataSchema.DeletesSection.getRange();
+ Range range = DeletesSection.getRange();
if (continuePoint != null && !continuePoint.isEmpty()) {
- String continueRow = MetadataSchema.DeletesSection.encodeRow(continuePoint);
+ String continueRow = DeletesSection.encodeRow(continuePoint);
range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), true,
range.getEndKey(), range.isEndKeyInclusive());
}
@@ -169,10 +168,9 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
throw new RuntimeException(e);
}
scanner.setRange(range);
-
- return Iterators.transform(scanner.iterator(),
- entry -> MetadataSchema.DeletesSection.decodeRow(entry.getKey().getRow().toString()));
-
+ return StreamSupport.stream(scanner.spliterator(), false)
+ .filter(entry -> entry.getValue().equals(DeletesSection.SkewedKeyValue.NAME))
+ .map(entry -> DeletesSection.decodeRow(entry.getKey().getRow().toString())).iterator();
} else {
throw new IllegalArgumentException();
}
@@ -196,9 +194,8 @@ public class ServerAmpleImpl extends AmpleImpl implements Ample {
public static Mutation createDeleteMutation(ServerContext context, TableId tableId,
String pathToRemove) {
Path path = context.getVolumeManager().getFullPath(tableId, pathToRemove);
- Mutation delFlag =
- new Mutation(new Text(MetadataSchema.DeletesSection.encodeRow(path.toString())));
- delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
+ Mutation delFlag = new Mutation(new Text(DeletesSection.encodeRow(path.toString())));
+ delFlag.put(EMPTY_TEXT, EMPTY_TEXT, DeletesSection.SkewedKeyValue.NAME);
return delFlag;
}
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
index 18ca7b9..38d967d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
@@ -38,8 +38,8 @@ public class UpgradeCoordinator {
private boolean haveUpgradedZooKeeper = false;
private boolean startedMetadataUpgrade = false;
private int currentVersion;
- private Map<Integer,Upgrader> upgraders =
- Map.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9());
+ private Map<Integer,Upgrader> upgraders = Map.of(ServerConstants.SHORTEN_RFILE_KEYS,
+ new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10());
public UpgradeCoordinator(ServerContext ctx) {
int currentVersion = ServerUtil.getAccumuloPersistentVersion(ctx.getVolumeManager());
diff --git a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
index e2ce6c4..c349f8d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.master.upgrade;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES;
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
import java.io.IOException;
import java.io.UncheckedIOException;
@@ -27,19 +29,33 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVIterator;
import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -51,6 +67,7 @@ import org.apache.accumulo.server.fs.FileRef;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.metadata.RootGcCandidates;
+import org.apache.accumulo.server.metadata.ServerAmpleImpl;
import org.apache.accumulo.server.metadata.TabletMutatorBase;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -74,6 +91,14 @@ public class Upgrader9to10 implements Upgrader {
public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + "/current_logs";
public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
+ public static final Value UPGRADED = MetadataSchema.DeletesSection.SkewedKeyValue.NAME;
+ public static final String OLD_DELETE_PREFIX = "~del";
+
+ /**
+ * This percentage was taken from the SimpleGarbageCollector and if nothing else is going on
+ * during upgrade then it could be larger.
+ */
+ static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;
@Override
public void upgradeZookeeper(ServerContext ctx) {
@@ -82,6 +107,8 @@ public class Upgrader9to10 implements Upgrader {
@Override
public void upgradeMetadata(ServerContext ctx) {
+ upgradeFileDeletes(ctx, Ample.DataLevel.METADATA);
+ upgradeFileDeletes(ctx, Ample.DataLevel.USER);
}
@@ -352,4 +379,78 @@ public class Upgrader9to10 implements Upgrader {
}
}
+ public void upgradeFileDeletes(ServerContext ctx, Ample.DataLevel level) {
+
+ String tableName = level.metaTable();
+ AccumuloClient c = ctx;
+
+ // find all deletes
+ try (BatchWriter writer = c.createBatchWriter(tableName, new BatchWriterConfig())) {
+ log.info("looking for candidates in table {}", tableName);
+ Iterator<String> oldCandidates = getOldCandidates(ctx, tableName);
+ int t = 0; // no waiting first time through
+ while (oldCandidates.hasNext()) {
+ // give it some time for memory to clean itself up if needed
+ sleepUninterruptibly(t, TimeUnit.SECONDS);
+ List<String> deletes = readCandidatesThatFitInMemory(oldCandidates);
+ log.info("found {} deletes to upgrade", deletes.size());
+ for (String olddelete : deletes) {
+ // create new formatted delete
+ log.trace("upgrading delete entry for {}", olddelete);
+ writer.addMutation(ServerAmpleImpl.createDeleteMutation(ctx, level.tableId(), olddelete));
+ }
+ writer.flush();
+ // if nothing thrown then we're good so mark all deleted
+ log.info("upgrade processing completed so delete old entries");
+ for (String olddelete : deletes) {
+ log.trace("deleting old entry for {}", olddelete);
+ writer.addMutation(deleteOldDeleteMutation(olddelete));
+ }
+ writer.flush();
+ t = 3;
+ }
+ } catch (TableNotFoundException | MutationsRejectedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private Iterator<String> getOldCandidates(ServerContext ctx, String tableName)
+ throws TableNotFoundException {
+ Range range = MetadataSchema.DeletesSection.getRange();
+ Scanner scanner = ctx.createScanner(tableName, Authorizations.EMPTY);
+ scanner.setRange(range);
+ return StreamSupport.stream(scanner.spliterator(), false)
+ .filter(entry -> !entry.getValue().equals(UPGRADED))
+ .map(entry -> entry.getKey().getRow().toString().substring(OLD_DELETE_PREFIX.length()))
+ .iterator();
+ }
+
+ private List<String> readCandidatesThatFitInMemory(Iterator<String> candidates) {
+ List<String> result = new ArrayList<>();
+ // Always read at least one. If memory doesn't clean up fast enough at least
+ // some progress is made.
+ while (candidates.hasNext()) {
+ result.add(candidates.next());
+ if (almostOutOfMemory(Runtime.getRuntime()))
+ break;
+ }
+ return result;
+ }
+
+ private Mutation deleteOldDeleteMutation(final String delete) {
+ Mutation m = new Mutation(OLD_DELETE_PREFIX + delete);
+ m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+ return m;
+ }
+
+ private boolean almostOutOfMemory(Runtime runtime) {
+ if (runtime.totalMemory() - runtime.freeMemory()
+ > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) {
+ log.info("List of delete candidates has exceeded the memory"
+ + " threshold. Attempting to delete what has been gathered so far.");
+ return true;
+ } else
+ return false;
+ }
+
}
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index d28eaaa..ae9936f 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -57,6 +57,7 @@ import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
+import org.apache.accumulo.server.metadata.ServerAmpleImpl;
import org.apache.accumulo.test.TestIngest;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
@@ -213,7 +214,11 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) {
bw.addMutation(createDelMutation("", "", "", ""));
bw.addMutation(createDelMutation("", "testDel", "test", "valueTest"));
- bw.addMutation(createDelMutation("/", "", "", ""));
+ // path is invalid but value is expected - only way the invalid entry will come through
+ // processing and
+ // show up to produce error in output to allow while loop to end
+ bw.addMutation(
+ createDelMutation("/", "", "", MetadataSchema.DeletesSection.SkewedKeyValue.STR_NAME));
}
ProcessInfo gc = cluster.exec(SimpleGarbageCollector.class);
@@ -304,7 +309,8 @@ public class GarbageCollectorIT extends ConfigurableMacBase {
for (int i = 0; i < 100000; ++i) {
String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+ "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj";
- Mutation delFlag = createDelMutation(String.format("/%020d/%s", i, longpath), "", "", "");
+ Mutation delFlag = ServerAmpleImpl.createDeleteMutation(getServerContext(),
+ MetadataTable.ID, String.format("/%020d/%s", i, longpath));
bw.addMutation(delFlag);
}
}
diff --git a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
new file mode 100644
index 0000000..b4b231a
--- /dev/null
+++ b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.upgrade;
+
+import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.master.upgrade.Upgrader9to10;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
+ private static final String OUR_SECRET = "itsreallysecret";
+ private static final String OLDDELPREFIX = "~del";
+ private static final Upgrader9to10 upgrader = new Upgrader9to10();
+
+ @Override
+ public int defaultTimeoutSeconds() {
+ return 5 * 60;
+ }
+
+ @Override
+ public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+ cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+ cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
+ cfg.setDefaultMemory(64, MemoryUnit.MEGABYTE);
+ cfg.setMemory(ServerType.MASTER, 16, MemoryUnit.MEGABYTE);
+ cfg.setMemory(ServerType.ZOOKEEPER, 32, MemoryUnit.MEGABYTE);
+ cfg.setProperty(Property.GC_CYCLE_START, "1");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+ cfg.setProperty(Property.GC_PORT, "0");
+ cfg.setProperty(Property.TSERV_MAXMEM, "5K");
+ cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+
+ // use raw local file system so walogs sync and flush will work
+ hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+ }
+
+ private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException {
+ // kill gc started by MAC
+ getCluster().killProcess(ServerType.GARBAGE_COLLECTOR,
+ getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
+ // delete lock in zookeeper if there, this will allow next GC to start quickly
+ String path = getServerContext().getZooKeeperRoot() + Constants.ZGC_LOCK;
+ ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
+ try {
+ ZooLock.deleteLock(zk, path);
+ } catch (IllegalStateException e) {
+ log.error("Unable to delete ZooLock for mini accumulo-gc", e);
+ }
+
+ assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
+ }
+
+ @Test
+ public void gcUpgradeRootTableDeletesIT() throws Exception {
+ gcUpgradeDeletesTest(Ample.DataLevel.METADATA, 3);
+ }
+
+ @Test
+ public void gcUpgradeMetadataTableDeletesIT() throws Exception {
+ gcUpgradeDeletesTest(Ample.DataLevel.USER, 3);
+ }
+
+ @Test
+ public void gcUpgradeNoDeletesIT() throws Exception {
+ gcUpgradeDeletesTest(Ample.DataLevel.METADATA, 0);
+
+ }
+
+ /**
+ * This is really hard to make happen - the minicluster can only use so little memory to start up.
+ * The {@link org.apache.accumulo.master.upgrade.Upgrader9to10} CANDIDATE_MEMORY_PERCENTAGE can be
+ * adjusted.
+ */
+ @Test
+ public void gcUpgradeOutofMemoryTest() throws Exception {
+ killMacGc(); // we do not want anything deleted
+
+ int somebignumber = 100000;
+ String longpathname = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+ + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"
+ + "kkkkkkkkkkkkkkkkkklllllllllllllllllllllmmmmmmmmmmmmmmmmmnnnnnnnnnnnnnnnn";
+ longpathname += longpathname; // make it even longer
+ Ample.DataLevel level = Ample.DataLevel.USER;
+
+ log.info("Filling metadata table with lots of bogus delete flags");
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
+ addEntries(c, level.metaTable(), somebignumber, longpathname);
+
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ upgrader.upgradeFileDeletes(getServerContext(), level);
+
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Range range = MetadataSchema.DeletesSection.getRange();
+ Scanner scanner;
+ try {
+ scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ scanner.setRange(range);
+ assertEquals(somebignumber, Iterators.size(scanner.iterator()));
+ }
+ }
+
+ private void gcUpgradeDeletesTest(Ample.DataLevel level, int count) throws Exception {
+ killMacGc();// we do not want anything deleted
+
+ log.info("Testing delete upgrades for {}", level.metaTable());
+ try (AccumuloClient c = Accumulo.newClient().from(getClientProperties()).build()) {
+
+ Map<String,String> expected = addEntries(c, level.metaTable(), count, "somefile");
+ Map<String,String> actual = new HashMap<>();
+
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ upgrader.upgradeFileDeletes(getServerContext(), level);
+ sleepUninterruptibly(1, TimeUnit.SECONDS);
+ Range range = MetadataSchema.DeletesSection.getRange();
+
+ Scanner scanner;
+ try {
+ scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ scanner.setRange(range);
+ scanner.iterator().forEachRemaining(entry -> {
+ actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+ });
+
+ assertEquals(expected, actual);
+
+ // ENSURE IDEMPOTENCE - run upgrade again to ensure nothing is changed because there is
+ // nothing to change
+ upgrader.upgradeFileDeletes(getServerContext(), level);
+ try {
+ scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ scanner.setRange(range);
+ actual.clear();
+ scanner.iterator().forEachRemaining(entry -> {
+ actual.put(entry.getKey().getRow().toString(), entry.getValue().toString());
+ });
+ assertEquals(expected, actual);
+ }
+ }
+
+ private Mutation createOldDelMutation(String path, String cf, String cq, String val) {
+ Text row = new Text(OLDDELPREFIX + path);
+ Mutation delFlag = new Mutation(row);
+ delFlag.put(cf, cq, val);
+ return delFlag;
+ }
+
+ private Map<String,String> addEntries(AccumuloClient client, String table, int count,
+ String filename) throws Exception {
+ client.securityOperations().grantTablePermission(client.whoami(), table, TablePermission.WRITE);
+ Map<String,String> expected = new TreeMap<>();
+ try (BatchWriter bw = client.createBatchWriter(table)) {
+ for (int i = 0; i < count; ++i) {
+ String longpath = String.format("hdfs://localhost:8020/%020d/%s", i, filename);
+ Mutation delFlag = createOldDelMutation(longpath, "", "", "");
+ bw.addMutation(delFlag);
+ expected.put(MetadataSchema.DeletesSection.encodeRow(longpath),
+ Upgrader9to10.UPGRADED.toString());
+ }
+ return expected;
+ }
+ }
+
+}