You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mj...@apache.org on 2016/06/08 13:30:56 UTC
[2/2] accumulo git commit: Merge branch '1.6' into 1.7
Merge branch '1.6' into 1.7
Adds commit for ACCUMULO-4157 to fix bug where WALs were deleted too quickly
for "Dead" Tservers
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f02d564
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f02d564
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f02d564
Branch: refs/heads/1.7
Commit: 5f02d564ec3dae626edb7091fc1a92f5fd760f97
Parents: 0eab0ec e0426c5
Author: Michael Wall <mj...@apache.org>
Authored: Wed Jun 8 08:34:26 2016 -0400
Committer: Michael Wall <mj...@apache.org>
Committed: Wed Jun 8 08:34:26 2016 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 2 +
.../apache/accumulo/core/conf/PropertyTest.java | 5 +
.../gc/GarbageCollectWriteAheadLogs.java | 296 +++++++++++++----
.../gc/GarbageCollectWriteAheadLogsTest.java | 332 ++++++++++++++++++-
4 files changed, 564 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index dbb2036,5fff17f..c427610
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -364,7 -305,8 +364,9 @@@ public enum Property
GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
+ GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
+ GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION,
+ "Time to wait after a tserver is first seen as dead before removing associated WAL files"),
// properties that are specific to the monitor server behavior
MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."),
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 1735c0d,b7d8d92..a62ffb2
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -51,34 -37,35 +52,39 @@@ import org.apache.accumulo.core.securit
import org.apache.accumulo.core.tabletserver.log.LogEntry;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
+import org.apache.accumulo.core.trace.Span;
+import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.trace.Tracer;
import org.apache.accumulo.core.util.AddressUtil;
-import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.conf.ServerConfiguration;
import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.security.SystemCredentials;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
import org.apache.accumulo.server.util.MetadataTableUtil;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.trace.instrument.Span;
-import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import com.google.common.collect.Iterables;
import com.google.common.net.HostAndPort;
+import com.google.protobuf.InvalidProtocolBufferException;
+ import java.util.concurrent.TimeUnit;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
public class GarbageCollectWriteAheadLogs {
- private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
+ private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
- private final Instance instance;
+ private final AccumuloServerContext context;
private final VolumeManager fs;
+ private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+ private AccumuloConfiguration config;
private boolean useTrash;
@@@ -201,75 -184,202 +216,202 @@@
}
}
- private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+ private AccumuloConfiguration getConfig() {
- return ServerConfiguration.getSystemConfiguration(instance);
++ return context.getServerConfigurationFactory().getConfiguration();
+ }
+
+ /**
+ * Top level method for removing WAL files.
+ * <p>
+ * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal
+ *
+ * @param nameToFileMap
+ * Map of filename to Path
+ * @param serverToFileMap
+ * Map of HostAndPort string to a list of Paths
+ * @param sortedWALogs
+ * Map of sorted WAL names to Path
+ * @param status
+ * GCStatus object for tracking what is done
+ * @return 0 always
+ */
+ @VisibleForTesting
+ int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+ // TODO: remove nameToFileMap from method signature, not used here I don't think
+ AccumuloConfiguration conf = getConfig();
for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
if (entry.getKey().isEmpty()) {
- // old-style log entry, just remove it
- for (Path path : entry.getValue()) {
- log.debug("Removing old-style WAL " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
+ removeOldStyleWAL(entry, status);
} else {
- HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
- if (!holdsLock(address)) {
- for (Path path : entry.getValue()) {
- log.debug("Removing WAL for offline server " + path);
- try {
- if (!useTrash || !fs.moveToTrash(path))
- fs.deleteRecursively(path);
- status.currentLog.deleted++;
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ex) {
- log.error("Unable to delete wal " + path + ": " + ex);
- }
- }
- continue;
- } else {
- Client tserver = null;
- try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
- tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
- log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
- status.currentLog.deleted += entry.getValue().size();
- } catch (TException e) {
- log.warn("Error talking to " + address + ": " + e);
- } finally {
- if (tserver != null)
- ThriftUtil.returnClient(tserver);
- }
- }
+ removeWALFile(entry, conf, status);
}
}
-
for (Path swalog : sortedWALogs.values()) {
- log.debug("Removing sorted WAL " + swalog);
+ removeSortedWAL(swalog);
+ }
+ return 0;
+ }
+
+ /**
+ * Removes sortedWALs.
+ * <p>
+ * Sorted WALs are WALs that are in the recovery directory and have already been used.
+ *
+ * @param swalog
+ * Path to the WAL
+ */
+ @VisibleForTesting
+ void removeSortedWAL(Path swalog) {
+ log.debug("Removing sorted WAL " + swalog);
+ try {
+ if (!useTrash || !fs.moveToTrash(swalog)) {
+ fs.deleteRecursively(swalog);
+ }
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ioe) {
try {
- if (!useTrash || !fs.moveToTrash(swalog)) {
- fs.deleteRecursively(swalog);
+ if (fs.exists(swalog)) {
+ log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
}
- } catch (FileNotFoundException ex) {
- // ignored
- } catch (IOException ioe) {
+ } catch (IOException ex) {
+ log.error("Unable to check for the existence of " + swalog, ex);
+ }
+ }
+ }
+
+ /**
+ * A wrapper method to check if the tserver using the WAL is still alive
+ * <p>
+ * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive
+ *
+ * @param entry
+ * WAL information gathered
+ * @param conf
+ * AccumuloConfiguration object
+ * @param status
+ * GCStatus object
+ */
+ void removeWALFile(Entry<String,ArrayList<Path>> entry, AccumuloConfiguration conf, final GCStatus status) {
+ HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+ if (!holdsLock(address)) {
+ removeWALfromDownTserver(address, conf, entry, status);
+ } else {
+ askTserverToRemoveWAL(address, conf, entry, status);
+ }
+ }
+
+ /**
+ * Asks a currently running tserver to remove it's WALs.
+ * <p>
+ * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of
+ * just relying on information in the metadata table.
+ *
+ * @param address
+ * HostAndPort of the tserver
+ * @param conf
+ * AccumuloConfiguration entry
+ * @param entry
+ * WAL information gathered
+ * @param status
+ * GCStatus object
+ */
+ @VisibleForTesting
+ void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ firstSeenDead.remove(address);
+ Client tserver = null;
+ try {
- tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
- tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue()));
++ tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
++ tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
+ log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey());
+ status.currentLog.deleted += entry.getValue().size();
+ } catch (TException e) {
+ log.warn("Error talking to " + address + ": " + e);
+ } finally {
+ if (tserver != null)
+ ThriftUtil.returnClient(tserver);
+ }
+ }
+
+ /**
+ * Get the configured wait period a server has to be dead.
+ * <p>
+ * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like
+ * 3600s, 5m or 2h.
+ *
+ * @param conf
+ * AccumuloConfiguration
+ * @return long that represents the millis to wait
+ */
+ @VisibleForTesting
+ long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+ return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT);
+ }
+
+ /**
+ * Remove walogs associated with a tserver that no longer has a look.
+ * <p>
+ * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead
+ * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver.
+ *
+ * @param address
+ * HostAndPort of the tserver with no lock
+ * @param conf
+ * AccumuloConfiguration to get that gc.wal.dead.server.wait info
+ * @param entry
+ * The WALOG path
+ * @param status
+ * GCStatus for tracking changes
+ */
+ @VisibleForTesting
+ void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ // tserver is down, only delete once configured time has passed
+ if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) {
+ for (Path path : entry.getValue()) {
+ log.debug("Removing WAL for offline server " + address + " at " + path);
try {
- if (fs.exists(swalog)) {
- log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+ if (!useTrash || !fs.moveToTrash(path)) {
+ fs.deleteRecursively(path);
}
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
} catch (IOException ex) {
- log.error("Unable to check for the existence of " + swalog, ex);
+ log.error("Unable to delete wal " + path + ": " + ex);
}
}
+ firstSeenDead.remove(address);
+ } else {
+ log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address);
}
+ }
- return 0;
+ /**
+ * Removes old style WAL entries.
+ * <p>
+ * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support
+ * upgrading from that version, this code should be removed
+ *
+ * @param entry
+ * Map of empty server address to List of Paths
+ * @param status
+ * GCStatus object
+ */
+ @VisibleForTesting
+ void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ // old-style log entry, just remove it
+ for (Path path : entry.getValue()) {
+ log.debug("Removing old-style WAL " + path);
+ try {
+ if (!useTrash || !fs.moveToTrash(path))
+ fs.deleteRecursively(path);
+ status.currentLog.deleted++;
+ } catch (FileNotFoundException ex) {
+ // ignored
+ } catch (IOException ex) {
+ log.error("Unable to delete wal " + path + ": " + ex);
+ }
+ }
}
/**
@@@ -311,14 -421,13 +453,15 @@@
return result;
}
- protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ @VisibleForTesting
+ int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
InterruptedException {
int count = 0;
- Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
+ Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
+ // For each WAL reference in the metadata table
while (iterator.hasNext()) {
+ // Each metadata reference has at least one WAL file
for (String entry : iterator.next().logSet) {
// old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
// the last "/" will mark a UUID file name.
@@@ -341,101 -448,8 +484,102 @@@
return count;
}
+ protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
+ Connector conn;
+ try {
+ conn = context.getConnector();
+ } catch (AccumuloException | AccumuloSecurityException e) {
+ log.error("Failed to get connector", e);
+ throw new IllegalArgumentException(e);
+ }
+
+ int count = 0;
+
+ Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
+
+ while (walIter.hasNext()) {
+ Entry<String,Path> wal = walIter.next();
+ String fullPath = wal.getValue().toString();
+ if (neededByReplication(conn, fullPath)) {
+ log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
+ // If we haven't already removed it, check to see if this WAL is
+ // "in use" by replication (needed for replication purposes)
+ status.currentLog.inUse++;
+
+ walIter.remove();
+ sortedWALogs.remove(wal.getKey());
+ } else {
+ log.debug("WAL not needed for replication {}", fullPath);
+ }
+ count++;
+ }
+
+ return count;
+ }
+
+ /**
+ * Determine if the given WAL is needed for replication
+ *
+ * @param wal
+ * The full path (URI)
+ * @return True if the WAL is still needed by replication (not a candidate for deletion)
+ */
+ protected boolean neededByReplication(Connector conn, String wal) {
+ log.info("Checking replication table for " + wal);
+
+ Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
+
+ // TODO Push down this filter to the tserver to only return records
+ // that are not completely replicated and convert this loop into a
+ // `return s.iterator.hasNext()` statement
+ for (Entry<Key,Value> entry : iter) {
+ try {
+ Status status = Status.parseFrom(entry.getValue().get());
+ log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
+ if (!StatusUtil.isSafeForRemoval(status)) {
+ return true;
+ }
+ } catch (InvalidProtocolBufferException e) {
+ log.error("Could not deserialize Status protobuf for " + entry.getKey(), e);
+ }
+ }
+
+ return false;
+ }
+
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+ Scanner metaScanner;
+ try {
+ metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+ } catch (TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+
+ // Need to add in the replication section prefix
+ metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
+ // Limit the column family to be sure
+ metaScanner.fetchColumnFamily(ReplicationSection.COLF);
+
+ try {
+ Scanner replScanner = ReplicationTable.getScanner(conn);
+
+ // Scan only the Status records
+ StatusSection.limit(replScanner);
+
+ // Only look for this specific WAL
+ replScanner.setRange(Range.exact(wal));
+
+ return Iterables.concat(metaScanner, replScanner);
+ } catch (ReplicationTableOfflineException e) {
+ // do nothing
+ }
+
+ return metaScanner;
+ }
+
- private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+ @VisibleForTesting
+ int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 5801faa,03f5c96..bc9fca3
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@@ -22,58 -23,46 +23,75 @@@ import static org.easymock.EasyMock.rep
import java.io.FileNotFoundException;
import java.io.IOException;
- import java.util.ArrayList;
-
+import java.util.Collections;
- import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+ import java.util.ArrayList;
+ import java.util.HashMap;
import java.util.List;
import java.util.Map;
- import java.util.Map.Entry;
import java.util.UUID;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
- import org.apache.accumulo.core.client.mock.MockInstance;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.ConfigurationCopy;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.conf.SiteConfiguration;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Value;
- import org.apache.accumulo.core.gc.thrift.GCStatus;
- import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
+import org.apache.accumulo.core.protobuf.ProtobufUtil;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
+import org.apache.accumulo.core.replication.ReplicationTable;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.conf.ServerConfigurationFactory;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.replication.StatusUtil;
+import org.apache.accumulo.server.replication.proto.Replication.Status;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Assert;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.gc.thrift.GCStatus;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+ import org.apache.accumulo.core.client.mock.MockInstance;
+ import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+ import org.apache.accumulo.server.fs.VolumeManagerImpl;
+ import org.apache.zookeeper.KeeperException;
+
+ import java.io.File;
+ import java.util.Arrays;
+ import java.util.LinkedHashMap;
+ import java.util.Map.Entry;
+
+ import static org.easymock.EasyMock.createMock;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertSame;
+ import static org.junit.Assert.assertTrue;
+ import static java.lang.Thread.sleep;
+
+ import java.io.FileOutputStream;
+
+ import org.apache.commons.io.FileUtils;
+
+ import java.util.concurrent.TimeUnit;
+
public class GarbageCollectWriteAheadLogsTest {
private static final long BLOCK_SIZE = 64000000L;
@@@ -370,198 -261,287 +388,484 @@@
assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
}
+ // It was easier to do this than get the mocking working for me
+ private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
+
+ private List<Entry<Key,Value>> replData;
+
+ ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
+ super(context, fs, useTrash);
+ this.replData = replData;
+ }
+
+ @Override
+ protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
+ return this.replData;
+ }
+ }
+
+ @Test
+ public void replicationEntriesAffectGC() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+ Connector conn = createMock(Connector.class);
+
+ // Write a Status record which should prevent file1 from being deleted
+ LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
+
+ ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+
+ replay(conn);
+
+ // Open (not-closed) file must be retained
+ assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+ // No replication data, not needed
+ replData.clear();
+ assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
+
+ // The file is closed but not replicated, must be retained
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
+ assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
+
+ // File is closed and fully replicated, can be deleted
+ replData.clear();
+ replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
+ ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
+ assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
+ }
+
+ @Test
+ public void removeReplicationEntries() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
+ BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
+ Mutation m = new Mutation("/wals/" + file1);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+ bw.addMutation(m);
+ m = new Mutation("/wals/" + file2);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+ bw.addMutation(m);
+
+ // These WALs are potential candidates for deletion from fs
+ Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+ Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+ // Make the GCStatus and GcCycleStats
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+
+ // Both should have been deleted
+ Assert.assertEquals(0, nameToFileMap.size());
+ }
+
+ @Test
+ public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
+ String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
+
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+
+ Connector conn = context.getConnector();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ long file1CreateTime = System.currentTimeMillis();
+ long file2CreateTime = file1CreateTime + 50;
+ // Write some records to the metadata table, we haven't yet written status records to the replication table
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
+ bw.addMutation(m);
+
+ m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
+ bw.addMutation(m);
+
+ // These WALs are potential candidates for deletion from fs
+ Map<String,Path> nameToFileMap = new HashMap<>();
+ nameToFileMap.put(file1, new Path("/wals/" + file1));
+ nameToFileMap.put(file2, new Path("/wals/" + file2));
+
+ Map<String,Path> sortedWALogs = Collections.emptyMap();
+
+ // Make the GCStatus and GcCycleStats objects
+ GCStatus status = new GCStatus();
+ GcCycleStats cycleStats = new GcCycleStats();
+ status.currentLog = cycleStats;
+
+ // We should iterate over two entries
+ Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
+
+ // We should have noted that two files were still in use
+ Assert.assertEquals(2l, cycleStats.inUse);
+
+ // Both should have been deleted
+ Assert.assertEquals(0, nameToFileMap.size());
+ }
+
+ @Test
+ public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+ Connector conn = context.getConnector();
+
+ String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
+ bw.addMutation(m);
+ bw.close();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
+ Entry<Key,Value> entry = Iterables.getOnlyElement(data);
+
+ Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
+ }
+
+ @Test
+ public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
+ Instance inst = new MockInstance(testName.getMethodName());
+ AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
+ Connector conn = context.getConnector();
+
+ long walCreateTime = System.currentTimeMillis();
+ String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
+ BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
+ Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
+ m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+ bw.addMutation(m);
+ bw.close();
+
+ bw = ReplicationTable.getBatchWriter(conn);
+ m = new Mutation(wal);
+ StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
+ bw.addMutation(m);
+ bw.close();
+
+ GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+
+ Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
+ Map<Key,Value> data = new HashMap<>();
+ for (Entry<Key,Value> e : iter) {
+ data.put(e.getKey(), e.getValue());
+ }
+
+ Assert.assertEquals(2, data.size());
+
+ // Should get one element from each table (metadata and replication)
+ for (Key k : data.keySet()) {
+ String row = k.getRow().toString();
+ if (row.startsWith(ReplicationSection.getRowPrefix())) {
+ Assert.assertTrue(row.endsWith(wal));
+ } else {
+ Assert.assertEquals(wal, row);
+ }
+ }
+ }
++
+ @Test
+ public void testTimeToDeleteTrue() throws InterruptedException {
+ HostAndPort address = HostAndPort.fromString("tserver1:9998");
+ long wait = AccumuloConfiguration.getTimeInMillis("1s");
+ gcwal.clearFirstSeenDead();
+ assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+ sleep(wait * 2);
+ assertTrue(gcwal.timeToDelete(address, wait));
+ }
+
+ @Test
+ public void testTimeToDeleteFalse() {
+ HostAndPort address = HostAndPort.fromString("tserver1:9998");
+ long wait = AccumuloConfiguration.getTimeInMillis("1h");
+ long t1, t2;
+ boolean ttd;
+ do {
+ t1 = System.nanoTime();
+ gcwal.clearFirstSeenDead();
+ assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+ ttd = gcwal.timeToDelete(address, wait);
+ t2 = System.nanoTime();
+ } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait
+
+ assertFalse(ttd);
+ }
+
+ @Test
+ public void testTimeToDeleteWithNullAddress() {
+ assertFalse(gcwal.timeToDelete(null, 123l));
+ }
+
+ /**
+ * Wrapper class with some helper methods
+ * <p>
+ * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner.
+ */
+ class MethodCalls {
+
+ private LinkedHashMap<String,List<Object>> mapWrapper;
+
+ public MethodCalls() {
+ mapWrapper = new LinkedHashMap<String,List<Object>>();
+ }
+
+ public void put(String methodName, Object... args) {
+ mapWrapper.put(methodName, Arrays.asList(args));
+ }
+
+ public int size() {
+ return mapWrapper.size();
+ }
+
+ public boolean hasOneEntry() {
+ return size() == 1;
+ }
+
+ public Map.Entry<String,List<Object>> getFirstEntry() {
+ return mapWrapper.entrySet().iterator().next();
+ }
+
+ public String getFirstEntryMethod() {
+ return getFirstEntry().getKey();
+ }
+
+ public List<Object> getFirstEntryArgs() {
+ return getFirstEntry().getValue();
+ }
+
+ public Object getFirstEntryArg(int number) {
+ return getFirstEntryArgs().get(number);
+ }
+ }
+
+ /**
+ * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method
+ * <p>
+ * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method
+ */
+ class GCWALPartialMock extends GarbageCollectWriteAheadLogs {
+
+ private boolean holdsLockBool = false;
+
- public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
- super(i, vm, useTrash);
++ public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
++ super(ctx, vm, useTrash);
+ this.holdsLockBool = holdLock;
+ }
+
+ public MethodCalls methodCalls = new MethodCalls();
+
+ @Override
+ boolean holdsLock(HostAndPort addr) {
+ return holdsLockBool;
+ }
+
+ @Override
+ void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ methodCalls.put("removeWALFromDownTserver", address, conf, entry, status);
+ }
+
+ @Override
+ void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status);
+ }
+
+ @Override
+ void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+ methodCalls.put("removeOldStyleWAL", entry, status);
+ }
+
+ @Override
+ void removeSortedWAL(Path swalog) {
+ methodCalls.put("removeSortedWAL", swalog);
+ }
+ }
+
+ private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
- return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked);
++ AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
++ return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+ }
+
+ private Map<String,Path> getEmptyMap() {
+ return new HashMap<String,Path>();
+ }
+
+ private Map<String,ArrayList<Path>> getServerToFileMap1(String key, Path singlePath) {
+ Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+ serverToFileMap.put(key, new ArrayList<Path>(Arrays.asList(singlePath)));
+ return serverToFileMap;
+ }
+
+ @Test
+ public void testRemoveFilesWithOldStyle() throws IOException {
+ GCStatus status = new GCStatus();
+ GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+ Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+ Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1("", p1);
+
+ realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+ MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+ assertEquals("Only one method should have been called", 1, calls.size());
+ assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod());
+ Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+ assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0));
+ assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1));
+ }
+
+ @Test
+ public void testRemoveFilesWithDeadTservers() throws IOException {
+ GCStatus status = new GCStatus();
+ GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false);
+ String server = "tserver1+9997";
+ Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+ Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+
+ realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+ MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+ assertEquals("Only one method should have been called", 1, calls.size());
+ assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod());
+ assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+ assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+ Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+ assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+ assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+ }
+
+ @Test
+ public void testRemoveFilesWithLiveTservers() throws IOException {
+ GCStatus status = new GCStatus();
+ GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+ String server = "tserver1+9997";
+ Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+ Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+
+ realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+ MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+ assertEquals("Only one method should have been called", 1, calls.size());
+ assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod());
+ assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+ assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+ Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+ assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+ assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+ }
+
+ @Test
+ public void testRemoveFilesRemovesSortedWALs() throws IOException {
+ GCStatus status = new GCStatus();
+ GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+ Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+ Map<String,Path> sortedWALogs = new HashMap<String,Path>();
+ Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+ sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed
+
+ realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status);
+ MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+ assertEquals("Only one method should have been called", 1, calls.size());
+ assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod());
+ assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0));
+
+ }
+
+ static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver";
+ static String GCWAL_DEAD_TSERVER = "tserver1";
+ static String GCWAL_DEAD_TSERVER_PORT = "9995";
+ static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString();
+
+ class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
+
- public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException {
- super(i, vm, useTrash);
++ public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
++ super(ctx, vm, useTrash);
+ }
+
+ @Override
+ boolean holdsLock(HostAndPort addr) {
+ // tries use zookeeper
+ return false;
+ }
+
+ @Override
+ Map<String,Path> getSortedWALogs() {
+ return new HashMap<String,Path>();
+ }
+
+ @Override
+ int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+ String sep = File.separator;
+ Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep
+ + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+ fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT);
+ nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p);
+ return 1;
+ }
+
+ @Override
+ int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+ InterruptedException {
+ return 0;
+ }
+
+ long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+ // tries to use zookeeper
+ return 1000l;
+ }
+ }
+
+ @Test
+ public void testCollectWithDeadTserver() throws IOException, InterruptedException {
+ Instance i = new MockInstance();
++ AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(i));
+ File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR);
+ File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT);
+ File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+ if (!walFileDir.exists()) {
- walFileDir.mkdirs();
++ assertTrue("Directory was made", walFileDir.mkdirs());
+ new FileOutputStream(walFile).close();
+ }
+
+ try {
+ VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
- GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false);
++ GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+ GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
+
+ gcwal2.collect(status);
+
+ assertTrue("File should not be deleted", walFile.exists());
+ assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+ assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted());
+
+ sleep(2000);
+ gcwal2.collect(status);
+
+ assertFalse("File should be gone", walFile.exists());
+ assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+ assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted());
+
+ } finally {
+ if (walDir.exists()) {
+ FileUtils.deleteDirectory(walDir);
+ }
+ }
+ }
}