You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mj...@apache.org on 2016/06/08 13:30:55 UTC

[1/2] accumulo git commit: ACCUMULO-4157 Bug fix for removing WALs to quickly

Repository: accumulo
Updated Branches:
  refs/heads/1.7 0eab0ecff -> 5f02d564e


ACCUMULO-4157 Bug fix for removing WALs to quickly

Keep track of first time a tserver is seen down and only remove WALs for that server if past configurated threshhold

Trying to keep the changes small to fix the bug.  I'll create another ticket to refactor and cleanup

Includes an end to end test calling the collect method simulating a dead tserver.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e0426c51
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e0426c51
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e0426c51

Branch: refs/heads/1.7
Commit: e0426c51cd6991741e9be321aaa1e4f5361e0e3e
Parents: beb69cd
Author: Michael Wall <mj...@apache.org>
Authored: Wed Jun 8 08:06:27 2016 -0400
Committer: Michael Wall <mj...@apache.org>
Committed: Wed Jun 8 08:07:47 2016 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../apache/accumulo/core/conf/PropertyTest.java |   5 +
 .../gc/GarbageCollectWriteAheadLogs.java        | 308 ++++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 320 ++++++++++++++++++-
 4 files changed, 567 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2149ad9..5fff17f 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -305,6 +305,8 @@ public enum Property {
   GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
   GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
   GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
+  GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION,
+      "Time to wait after a tserver is first seen as dead before removing associated WAL files"),
 
   // properties that are specific to the monitor server behavior
   MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
index bca2e22..4d1dc70 100644
--- a/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/conf/PropertyTest.java
@@ -147,4 +147,9 @@ public class PropertyTest {
       }
     }
   }
+
+  @Test
+  public void testGCDeadServerWaitSecond() {
+    assertEquals("1h", Property.GC_WAL_DEAD_SERVER_WAIT.getDefaultValue());
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 06ace49..b7d8d92 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -16,6 +16,7 @@
  */
 package org.apache.accumulo.gc;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -55,12 +56,16 @@ import org.apache.thrift.TException;
 import org.apache.zookeeper.KeeperException;
 
 import com.google.common.net.HostAndPort;
+import java.util.concurrent.TimeUnit;
+import org.apache.accumulo.core.conf.Property;
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 
   private final Instance instance;
   private final VolumeManager fs;
+  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private AccumuloConfiguration config;
 
   private boolean useTrash;
 
@@ -107,6 +112,15 @@ public class GarbageCollectWriteAheadLogs {
     return useTrash;
   }
 
+  /**
+   * Removes all the WAL files that are no longer used.
+   * <p>
+   *
+   * This method is not Threadsafe. SimpleGarbageCollector#run does not invoke collect in a concurrent manner.
+   *
+   * @param status
+   *          GCStatus object
+   */
   public void collect(GCStatus status) {
 
     Span span = Trace.start("scanServers");
@@ -170,76 +184,202 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
-    AccumuloConfiguration conf = ServerConfiguration.getSystemConfiguration(instance);
+  private AccumuloConfiguration getConfig() {
+    return ServerConfiguration.getSystemConfiguration(instance);
+  }
+
+  /**
+   * Top level method for removing WAL files.
+   * <p>
+   * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal
+   *
+   * @param nameToFileMap
+   *          Map of filename to Path
+   * @param serverToFileMap
+   *          Map of HostAndPort string to a list of Paths
+   * @param sortedWALogs
+   *          Map of sorted WAL names to Path
+   * @param status
+   *          GCStatus object for tracking what is done
+   * @return 0 always
+   */
+  @VisibleForTesting
+  int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+    // TODO: remove nameToFileMap from method signature, not used here I don't think
+    AccumuloConfiguration conf = getConfig();
     for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
       if (entry.getKey().isEmpty()) {
-        // old-style log entry, just remove it
-        for (Path path : entry.getValue()) {
-          log.debug("Removing old-style WAL " + path);
-          try {
-            if (!useTrash || !fs.moveToTrash(path))
-              fs.deleteRecursively(path);
-            status.currentLog.deleted++;
-          } catch (FileNotFoundException ex) {
-            // ignored
-          } catch (IOException ex) {
-            log.error("Unable to delete wal " + path + ": " + ex);
-          }
-        }
+        removeOldStyleWAL(entry, status);
       } else {
-        HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
-        if (!holdsLock(address)) {
-          for (Path path : entry.getValue()) {
-            log.debug("Removing WAL for offline server " + path);
-            try {
-              if (!useTrash || !fs.moveToTrash(path))
-                fs.deleteRecursively(path);
-              status.currentLog.deleted++;
-            } catch (FileNotFoundException ex) {
-              // ignored
-            } catch (IOException ex) {
-              log.error("Unable to delete wal " + path + ": " + ex);
-            }
-          }
-          continue;
-        } else {
-          Client tserver = null;
-          try {
-            tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
-            tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue()));
-            log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-            status.currentLog.deleted += entry.getValue().size();
-          } catch (TException e) {
-            log.warn("Error talking to " + address + ": " + e);
-          } finally {
-            if (tserver != null)
-              ThriftUtil.returnClient(tserver);
-          }
-        }
+        removeWALFile(entry, conf, status);
       }
     }
-
     for (Path swalog : sortedWALogs.values()) {
-      log.debug("Removing sorted WAL " + swalog);
+      removeSortedWAL(swalog);
+    }
+    return 0;
+  }
+
+  /**
+   * Removes sortedWALs.
+   * <p>
+   * Sorted WALs are WALs that are in the recovery directory and have already been used.
+   *
+   * @param swalog
+   *          Path to the WAL
+   */
+  @VisibleForTesting
+  void removeSortedWAL(Path swalog) {
+    log.debug("Removing sorted WAL " + swalog);
+    try {
+      if (!useTrash || !fs.moveToTrash(swalog)) {
+        fs.deleteRecursively(swalog);
+      }
+    } catch (FileNotFoundException ex) {
+      // ignored
+    } catch (IOException ioe) {
       try {
-        if (!useTrash || !fs.moveToTrash(swalog)) {
-          fs.deleteRecursively(swalog);
+        if (fs.exists(swalog)) {
+          log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
         }
-      } catch (FileNotFoundException ex) {
-        // ignored
-      } catch (IOException ioe) {
+      } catch (IOException ex) {
+        log.error("Unable to check for the existence of " + swalog, ex);
+      }
+    }
+  }
+
+  /**
+   * A wrapper method to check if the tserver using the WAL is still alive
+   * <p>
+   * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive
+   *
+   * @param entry
+   *          WAL information gathered
+   * @param conf
+   *          AccumuloConfiguration object
+   * @param status
+   *          GCStatus object
+   */
+  void removeWALFile(Entry<String,ArrayList<Path>> entry, AccumuloConfiguration conf, final GCStatus status) {
+    HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+    if (!holdsLock(address)) {
+      removeWALfromDownTserver(address, conf, entry, status);
+    } else {
+      askTserverToRemoveWAL(address, conf, entry, status);
+    }
+  }
+
+  /**
+   * Asks a currently running tserver to remove it's WALs.
+   * <p>
+   * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of
+   * just relying on information in the metadata table.
+   *
+   * @param address
+   *          HostAndPort of the tserver
+   * @param conf
+   *          AccumuloConfiguration entry
+   * @param entry
+   *          WAL information gathered
+   * @param status
+   *          GCStatus object
+   */
+  @VisibleForTesting
+  void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+    firstSeenDead.remove(address);
+    Client tserver = null;
+    try {
+      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
+      tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue()));
+      log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey());
+      status.currentLog.deleted += entry.getValue().size();
+    } catch (TException e) {
+      log.warn("Error talking to " + address + ": " + e);
+    } finally {
+      if (tserver != null)
+        ThriftUtil.returnClient(tserver);
+    }
+  }
+
+  /**
+   * Get the configured wait period a server has to be dead.
+   * <p>
+   * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like
+   * 3600s, 5m or 2h.
+   *
+   * @param conf
+   *          AccumuloConfiguration
+   * @return long that represents the millis to wait
+   */
+  @VisibleForTesting
+  long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+    return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT);
+  }
+
+  /**
+   * Remove walogs associated with a tserver that no longer has a look.
+   * <p>
+   * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead
+   * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver.
+   *
+   * @param address
+   *          HostAndPort of the tserver with no lock
+   * @param conf
+   *          AccumuloConfiguration to get that gc.wal.dead.server.wait info
+   * @param entry
+   *          The WALOG path
+   * @param status
+   *          GCStatus for tracking changes
+   */
+  @VisibleForTesting
+  void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+    // tserver is down, only delete once configured time has passed
+    if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) {
+      for (Path path : entry.getValue()) {
+        log.debug("Removing WAL for offline server " + address + " at " + path);
         try {
-          if (fs.exists(swalog)) {
-            log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+          if (!useTrash || !fs.moveToTrash(path)) {
+            fs.deleteRecursively(path);
           }
+          status.currentLog.deleted++;
+        } catch (FileNotFoundException ex) {
+          // ignored
         } catch (IOException ex) {
-          log.error("Unable to check for the existence of " + swalog, ex);
+          log.error("Unable to delete wal " + path + ": " + ex);
         }
       }
+      firstSeenDead.remove(address);
+    } else {
+      log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address);
     }
+  }
 
-    return 0;
+  /**
+   * Removes old style WAL entries.
+   * <p>
+   * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support
+   * upgrading from that version, this code should be removed
+   *
+   * @param entry
+   *          Map of empty server address to List of Paths
+   * @param status
+   *          GCStatus object
+   */
+  @VisibleForTesting
+  void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+    // old-style log entry, just remove it
+    for (Path path : entry.getValue()) {
+      log.debug("Removing old-style WAL " + path);
+      try {
+        if (!useTrash || !fs.moveToTrash(path))
+          fs.deleteRecursively(path);
+        status.currentLog.deleted++;
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ex) {
+        log.error("Unable to delete wal " + path + ": " + ex);
+      }
+    }
   }
 
   /**
@@ -281,7 +421,8 @@ public class GarbageCollectWriteAheadLogs {
     return result;
   }
 
-  private int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+  @VisibleForTesting
+  int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
       InterruptedException {
     int count = 0;
     Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
@@ -307,19 +448,22 @@ public class GarbageCollectWriteAheadLogs {
     return count;
   }
 
-  private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+  @VisibleForTesting
+  int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
     return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
   }
 
   // TODO Remove deprecation warning suppression when Hadoop1 support is dropped
   @SuppressWarnings("deprecation")
   /**
-   * Scans write-ahead log directories for logs. The maps passed in are
-   * populated with scan information.
+   * Scans write-ahead log directories for logs. The maps passed in are populated with scan information.
    *
-   * @param walDirs write-ahead log directories
-   * @param fileToServerMap map of file paths to servers
-   * @param nameToFileMap map of file names to paths
+   * @param walDirs
+   *          write-ahead log directories
+   * @param fileToServerMap
+   *          map of file paths to servers
+   * @param nameToFileMap
+   *          map of file names to paths
    * @return number of servers located (including those with no logs present)
    */
   int scanServers(String[] walDirs, Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
@@ -360,7 +504,8 @@ public class GarbageCollectWriteAheadLogs {
     return servers.size();
   }
 
-  private Map<String,Path> getSortedWALogs() throws IOException {
+  @VisibleForTesting
+  Map<String,Path> getSortedWALogs() throws IOException {
     return getSortedWALogs(ServerConstants.getRecoveryDirs());
   }
 
@@ -410,4 +555,41 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
+  /**
+   * Determine if TServer has been dead long enough to remove associated WALs.
+   * <p>
+   * Uses a map where the key is the address and the value is the time first seen dead. If the address is not in the map, it is added with the current system
+   * nanoTime. When the passed in wait time has elapsed, this method returns true and removes the key and value from the map.
+   *
+   * @param address
+   *          HostAndPort of dead tserver
+   * @param wait
+   *          long value of elapsed millis to wait
+   * @return boolean whether enough time elapsed since the server was first seen as dead.
+   */
+  @VisibleForTesting
+  protected boolean timeToDelete(HostAndPort address, long wait) {
+    // check whether the tserver has been dead long enough
+    Long firstSeen = firstSeenDead.get(address);
+    if (firstSeen != null) {
+      long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstSeen);
+      log.trace("Elapsed milliseconds since " + address + " first seen dead: " + elapsedTime);
+      return elapsedTime > wait;
+    } else {
+      log.trace("Adding server to firstSeenDead map " + address);
+      firstSeenDead.put(address, System.nanoTime());
+      return false;
+    }
+  }
+
+  /**
+   * Method to clear the map used in timeToDelete.
+   * <p>
+   * Useful for testing.
+   */
+  @VisibleForTesting
+  void clearFirstSeenDead() {
+    firstSeenDead.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/e0426c51/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 76579f8..03f5c96 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -16,27 +16,53 @@
  */
 package org.apache.accumulo.gc;
 
-import static org.easymock.EasyMock.createMock;
+import com.google.common.net.HostAndPort;
+
 import static org.easymock.EasyMock.expect;
 import static org.easymock.EasyMock.replay;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
 
 import java.io.FileNotFoundException;
+import java.io.IOException;
+
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
+
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.zookeeper.KeeperException;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map.Entry;
+
+import static org.easymock.EasyMock.createMock;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static java.lang.Thread.sleep;
+
+import java.io.FileOutputStream;
+
+import org.apache.commons.io.FileUtils;
+
+import java.util.concurrent.TimeUnit;
+
 public class GarbageCollectWriteAheadLogsTest {
   private static final long BLOCK_SIZE = 64000000L;
 
@@ -234,4 +260,288 @@ public class GarbageCollectWriteAheadLogsTest {
     assertFalse(GarbageCollectWriteAheadLogs.isUUID("0" + UUID.randomUUID().toString()));
     assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
   }
+
+  @Test
+  public void testTimeToDeleteTrue() throws InterruptedException {
+    HostAndPort address = HostAndPort.fromString("tserver1:9998");
+    long wait = AccumuloConfiguration.getTimeInMillis("1s");
+    gcwal.clearFirstSeenDead();
+    assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+    sleep(wait * 2);
+    assertTrue(gcwal.timeToDelete(address, wait));
+  }
+
+  @Test
+  public void testTimeToDeleteFalse() {
+    HostAndPort address = HostAndPort.fromString("tserver1:9998");
+    long wait = AccumuloConfiguration.getTimeInMillis("1h");
+    long t1, t2;
+    boolean ttd;
+    do {
+      t1 = System.nanoTime();
+      gcwal.clearFirstSeenDead();
+      assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+      ttd = gcwal.timeToDelete(address, wait);
+      t2 = System.nanoTime();
+    } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait
+
+    assertFalse(ttd);
+  }
+
+  @Test
+  public void testTimeToDeleteWithNullAddress() {
+    assertFalse(gcwal.timeToDelete(null, 123l));
+  }
+
+  /**
+   * Wrapper class with some helper methods
+   * <p>
+   * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner.
+   */
+  class MethodCalls {
+
+    private LinkedHashMap<String,List<Object>> mapWrapper;
+
+    public MethodCalls() {
+      mapWrapper = new LinkedHashMap<String,List<Object>>();
+    }
+
+    public void put(String methodName, Object... args) {
+      mapWrapper.put(methodName, Arrays.asList(args));
+    }
+
+    public int size() {
+      return mapWrapper.size();
+    }
+
+    public boolean hasOneEntry() {
+      return size() == 1;
+    }
+
+    public Map.Entry<String,List<Object>> getFirstEntry() {
+      return mapWrapper.entrySet().iterator().next();
+    }
+
+    public String getFirstEntryMethod() {
+      return getFirstEntry().getKey();
+    }
+
+    public List<Object> getFirstEntryArgs() {
+      return getFirstEntry().getValue();
+    }
+
+    public Object getFirstEntryArg(int number) {
+      return getFirstEntryArgs().get(number);
+    }
+  }
+
+  /**
+   * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method
+   * <p>
+   * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method
+   */
+  class GCWALPartialMock extends GarbageCollectWriteAheadLogs {
+
+    private boolean holdsLockBool = false;
+
+    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
+      super(i, vm, useTrash);
+      this.holdsLockBool = holdLock;
+    }
+
+    public MethodCalls methodCalls = new MethodCalls();
+
+    @Override
+    boolean holdsLock(HostAndPort addr) {
+      return holdsLockBool;
+    }
+
+    @Override
+    void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+      methodCalls.put("removeWALFromDownTserver", address, conf, entry, status);
+    }
+
+    @Override
+    void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+      methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status);
+    }
+
+    @Override
+    void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+      methodCalls.put("removeOldStyleWAL", entry, status);
+    }
+
+    @Override
+    void removeSortedWAL(Path swalog) {
+      methodCalls.put("removeSortedWAL", swalog);
+    }
+  }
+
+  private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
+    return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked);
+  }
+
+  private Map<String,Path> getEmptyMap() {
+    return new HashMap<String,Path>();
+  }
+
+  private Map<String,ArrayList<Path>> getServerToFileMap1(String key, Path singlePath) {
+    Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+    serverToFileMap.put(key, new ArrayList<Path>(Arrays.asList(singlePath)));
+    return serverToFileMap;
+  }
+
+  @Test
+  public void testRemoveFilesWithOldStyle() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1("", p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod());
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0));
+    assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1));
+  }
+
+  @Test
+  public void testRemoveFilesWithDeadTservers() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false);
+    String server = "tserver1+9997";
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod());
+    assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+    assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+    assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+  }
+
+  @Test
+  public void testRemoveFilesWithLiveTservers() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    String server = "tserver1+9997";
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+    Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod());
+    assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+    assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+    Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+    assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+    assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+  }
+
+  @Test
+  public void testRemoveFilesRemovesSortedWALs() throws IOException {
+    GCStatus status = new GCStatus();
+    GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+    Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+    Map<String,Path> sortedWALogs = new HashMap<String,Path>();
+    Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+    sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed
+
+    realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status);
+    MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+    assertEquals("Only one method should have been called", 1, calls.size());
+    assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod());
+    assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0));
+
+  }
+
+  static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver";
+  static String GCWAL_DEAD_TSERVER = "tserver1";
+  static String GCWAL_DEAD_TSERVER_PORT = "9995";
+  static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString();
+
+  class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
+
+    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException {
+      super(i, vm, useTrash);
+    }
+
+    @Override
+    boolean holdsLock(HostAndPort addr) {
+      // tries use zookeeper
+      return false;
+    }
+
+    @Override
+    Map<String,Path> getSortedWALogs() {
+      return new HashMap<String,Path>();
+    }
+
+    @Override
+    int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+      String sep = File.separator;
+      Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep
+          + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+      fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT);
+      nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p);
+      return 1;
+    }
+
+    @Override
+    int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+        InterruptedException {
+      return 0;
+    }
+
+    long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+      // tries to use zookeeper
+      return 1000l;
+    }
+  }
+
+  @Test
+  public void testCollectWithDeadTserver() throws IOException, InterruptedException {
+    Instance i = new MockInstance();
+    File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR);
+    File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT);
+    File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+    if (!walFileDir.exists()) {
+      walFileDir.mkdirs();
+      new FileOutputStream(walFile).close();
+    }
+
+    try {
+      VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false);
+      GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
+
+      gcwal2.collect(status);
+
+      assertTrue("File should not be deleted", walFile.exists());
+      assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+      assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted());
+
+      sleep(2000);
+      gcwal2.collect(status);
+
+      assertFalse("File should be gone", walFile.exists());
+      assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+      assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted());
+
+    } finally {
+      if (walDir.exists()) {
+        FileUtils.deleteDirectory(walDir);
+      }
+    }
+  }
 }


[2/2] accumulo git commit: Merge branch '1.6' into 1.7

Posted by mj...@apache.org.
Merge branch '1.6' into 1.7

Adds commit for ACCUMULO-4157 to fix bug where WALs were deleted too quickly
for "Dead" Tservers


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f02d564
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f02d564
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f02d564

Branch: refs/heads/1.7
Commit: 5f02d564ec3dae626edb7091fc1a92f5fd760f97
Parents: 0eab0ec e0426c5
Author: Michael Wall <mj...@apache.org>
Authored: Wed Jun 8 08:34:26 2016 -0400
Committer: Michael Wall <mj...@apache.org>
Committed: Wed Jun 8 08:34:26 2016 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../apache/accumulo/core/conf/PropertyTest.java |   5 +
 .../gc/GarbageCollectWriteAheadLogs.java        | 296 +++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 332 ++++++++++++++++++-
 4 files changed, 564 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index dbb2036,5fff17f..c427610
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -364,7 -305,8 +364,9 @@@ public enum Property 
    GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
    GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
    GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
 +  GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
+   GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION,
+       "Time to wait after a tserver is first seen as dead before removing associated WAL files"),
  
    // properties that are specific to the monitor server behavior
    MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 1735c0d,b7d8d92..a62ffb2
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -51,34 -37,35 +52,39 @@@ import org.apache.accumulo.core.securit
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.AddressUtil;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.ServerConstants;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.instrument.Tracer;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.log4j.Logger;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 +import com.google.common.collect.Iterables;
  import com.google.common.net.HostAndPort;
 +import com.google.protobuf.InvalidProtocolBufferException;
+ import java.util.concurrent.TimeUnit;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
  
  public class GarbageCollectWriteAheadLogs {
 -  private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
  
 -  private final Instance instance;
 +  private final AccumuloServerContext context;
    private final VolumeManager fs;
+   private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+   private AccumuloConfiguration config;
  
    private boolean useTrash;
  
@@@ -201,75 -184,202 +216,202 @@@
      }
    }
  
-   private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+   private AccumuloConfiguration getConfig() {
 -    return ServerConfiguration.getSystemConfiguration(instance);
++    return context.getServerConfigurationFactory().getConfiguration();
+   }
+ 
+   /**
+    * Top level method for removing WAL files.
+    * <p>
+    * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal
+    *
+    * @param nameToFileMap
+    *          Map of filename to Path
+    * @param serverToFileMap
+    *          Map of HostAndPort string to a list of Paths
+    * @param sortedWALogs
+    *          Map of sorted WAL names to Path
+    * @param status
+    *          GCStatus object for tracking what is done
+    * @return 0 always
+    */
+   @VisibleForTesting
+   int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+     // TODO: remove nameToFileMap from method signature, not used here I don't think
+     AccumuloConfiguration conf = getConfig();
      for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
        if (entry.getKey().isEmpty()) {
-         // old-style log entry, just remove it
-         for (Path path : entry.getValue()) {
-           log.debug("Removing old-style WAL " + path);
-           try {
-             if (!useTrash || !fs.moveToTrash(path))
-               fs.deleteRecursively(path);
-             status.currentLog.deleted++;
-           } catch (FileNotFoundException ex) {
-             // ignored
-           } catch (IOException ex) {
-             log.error("Unable to delete wal " + path + ": " + ex);
-           }
-         }
+         removeOldStyleWAL(entry, status);
        } else {
-         HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
-         if (!holdsLock(address)) {
-           for (Path path : entry.getValue()) {
-             log.debug("Removing WAL for offline server " + path);
-             try {
-               if (!useTrash || !fs.moveToTrash(path))
-                 fs.deleteRecursively(path);
-               status.currentLog.deleted++;
-             } catch (FileNotFoundException ex) {
-               // ignored
-             } catch (IOException ex) {
-               log.error("Unable to delete wal " + path + ": " + ex);
-             }
-           }
-           continue;
-         } else {
-           Client tserver = null;
-           try {
-             tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
-             tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
-             log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-             status.currentLog.deleted += entry.getValue().size();
-           } catch (TException e) {
-             log.warn("Error talking to " + address + ": " + e);
-           } finally {
-             if (tserver != null)
-               ThriftUtil.returnClient(tserver);
-           }
-         }
+         removeWALFile(entry, conf, status);
        }
      }
- 
      for (Path swalog : sortedWALogs.values()) {
-       log.debug("Removing sorted WAL " + swalog);
+       removeSortedWAL(swalog);
+     }
+     return 0;
+   }
+ 
+   /**
+    * Removes sortedWALs.
+    * <p>
+    * Sorted WALs are WALs that are in the recovery directory and have already been used.
+    *
+    * @param swalog
+    *          Path to the WAL
+    */
+   @VisibleForTesting
+   void removeSortedWAL(Path swalog) {
+     log.debug("Removing sorted WAL " + swalog);
+     try {
+       if (!useTrash || !fs.moveToTrash(swalog)) {
+         fs.deleteRecursively(swalog);
+       }
+     } catch (FileNotFoundException ex) {
+       // ignored
+     } catch (IOException ioe) {
        try {
-         if (!useTrash || !fs.moveToTrash(swalog)) {
-           fs.deleteRecursively(swalog);
+         if (fs.exists(swalog)) {
+           log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
          }
-       } catch (FileNotFoundException ex) {
-         // ignored
-       } catch (IOException ioe) {
+       } catch (IOException ex) {
+         log.error("Unable to check for the existence of " + swalog, ex);
+       }
+     }
+   }
+ 
+   /**
+    * A wrapper method to check if the tserver using the WAL is still alive
+    * <p>
+    * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive
+    *
+    * @param entry
+    *          WAL information gathered
+    * @param conf
+    *          AccumuloConfiguration object
+    * @param status
+    *          GCStatus object
+    */
+   void removeWALFile(Entry<String,ArrayList<Path>> entry, AccumuloConfiguration conf, final GCStatus status) {
+     HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+     if (!holdsLock(address)) {
+       removeWALfromDownTserver(address, conf, entry, status);
+     } else {
+       askTserverToRemoveWAL(address, conf, entry, status);
+     }
+   }
+ 
+   /**
+    * Asks a currently running tserver to remove it's WALs.
+    * <p>
+    * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of
+    * just relying on information in the metadata table.
+    *
+    * @param address
+    *          HostAndPort of the tserver
+    * @param conf
+    *          AccumuloConfiguration entry
+    * @param entry
+    *          WAL information gathered
+    * @param status
+    *          GCStatus object
+    */
+   @VisibleForTesting
+   void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     firstSeenDead.remove(address);
+     Client tserver = null;
+     try {
 -      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 -      tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue()));
++      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
++      tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
+       log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey());
+       status.currentLog.deleted += entry.getValue().size();
+     } catch (TException e) {
+       log.warn("Error talking to " + address + ": " + e);
+     } finally {
+       if (tserver != null)
+         ThriftUtil.returnClient(tserver);
+     }
+   }
+ 
+   /**
+    * Get the configured wait period a server has to be dead.
+    * <p>
+    * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like
+    * 3600s, 5m or 2h.
+    *
+    * @param conf
+    *          AccumuloConfiguration
+    * @return long that represents the millis to wait
+    */
+   @VisibleForTesting
+   long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+     return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT);
+   }
+ 
+   /**
+    * Remove walogs associated with a tserver that no longer has a look.
+    * <p>
+    * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead
+    * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver.
+    *
+    * @param address
+    *          HostAndPort of the tserver with no lock
+    * @param conf
+    *          AccumuloConfiguration to get that gc.wal.dead.server.wait info
+    * @param entry
+    *          The WALOG path
+    * @param status
+    *          GCStatus for tracking changes
+    */
+   @VisibleForTesting
+   void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     // tserver is down, only delete once configured time has passed
+     if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) {
+       for (Path path : entry.getValue()) {
+         log.debug("Removing WAL for offline server " + address + " at " + path);
          try {
-           if (fs.exists(swalog)) {
-             log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+           if (!useTrash || !fs.moveToTrash(path)) {
+             fs.deleteRecursively(path);
            }
+           status.currentLog.deleted++;
+         } catch (FileNotFoundException ex) {
+           // ignored
          } catch (IOException ex) {
-           log.error("Unable to check for the existence of " + swalog, ex);
+           log.error("Unable to delete wal " + path + ": " + ex);
          }
        }
+       firstSeenDead.remove(address);
+     } else {
+       log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address);
      }
+   }
  
-     return 0;
+   /**
+    * Removes old style WAL entries.
+    * <p>
+    * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support
+    * upgrading from that version, this code should be removed
+    *
+    * @param entry
+    *          Map of empty server address to List of Paths
+    * @param status
+    *          GCStatus object
+    */
+   @VisibleForTesting
+   void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     // old-style log entry, just remove it
+     for (Path path : entry.getValue()) {
+       log.debug("Removing old-style WAL " + path);
+       try {
+         if (!useTrash || !fs.moveToTrash(path))
+           fs.deleteRecursively(path);
+         status.currentLog.deleted++;
+       } catch (FileNotFoundException ex) {
+         // ignored
+       } catch (IOException ex) {
+         log.error("Unable to delete wal " + path + ": " + ex);
+       }
+     }
    }
  
    /**
@@@ -311,14 -421,13 +453,15 @@@
      return result;
    }
  
-   protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+   @VisibleForTesting
+   int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
        InterruptedException {
      int count = 0;
 -    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
 +    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
  
 +    // For each WAL reference in the metadata table
      while (iterator.hasNext()) {
 +      // Each metadata reference has at least one WAL file
        for (String entry : iterator.next().logSet) {
          // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
          // the last "/" will mark a UUID file name.
@@@ -341,101 -448,8 +484,102 @@@
      return count;
    }
  
 +  protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
 +      InterruptedException {
 +    Connector conn;
 +    try {
 +      conn = context.getConnector();
 +    } catch (AccumuloException | AccumuloSecurityException e) {
 +      log.error("Failed to get connector", e);
 +      throw new IllegalArgumentException(e);
 +    }
 +
 +    int count = 0;
 +
 +    Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
 +
 +    while (walIter.hasNext()) {
 +      Entry<String,Path> wal = walIter.next();
 +      String fullPath = wal.getValue().toString();
 +      if (neededByReplication(conn, fullPath)) {
 +        log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
 +        // If we haven't already removed it, check to see if this WAL is
 +        // "in use" by replication (needed for replication purposes)
 +        status.currentLog.inUse++;
 +
 +        walIter.remove();
 +        sortedWALogs.remove(wal.getKey());
 +      } else {
 +        log.debug("WAL not needed for replication {}", fullPath);
 +      }
 +      count++;
 +    }
 +
 +    return count;
 +  }
 +
 +  /**
 +   * Determine if the given WAL is needed for replication
 +   *
 +   * @param wal
 +   *          The full path (URI)
 +   * @return True if the WAL is still needed by replication (not a candidate for deletion)
 +   */
 +  protected boolean neededByReplication(Connector conn, String wal) {
 +    log.info("Checking replication table for " + wal);
 +
 +    Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
 +
 +    // TODO Push down this filter to the tserver to only return records
 +    // that are not completely replicated and convert this loop into a
 +    // `return s.iterator.hasNext()` statement
 +    for (Entry<Key,Value> entry : iter) {
 +      try {
 +        Status status = Status.parseFrom(entry.getValue().get());
 +        log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
 +        if (!StatusUtil.isSafeForRemoval(status)) {
 +          return true;
 +        }
 +      } catch (InvalidProtocolBufferException e) {
 +        log.error("Could not deserialize Status protobuf for " + entry.getKey(), e);
 +      }
 +    }
 +
 +    return false;
 +  }
 +
 +  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
 +    Scanner metaScanner;
 +    try {
 +      metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    } catch (TableNotFoundException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    // Need to add in the replication section prefix
 +    metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
 +    // Limit the column family to be sure
 +    metaScanner.fetchColumnFamily(ReplicationSection.COLF);
 +
 +    try {
 +      Scanner replScanner = ReplicationTable.getScanner(conn);
 +
 +      // Scan only the Status records
 +      StatusSection.limit(replScanner);
 +
 +      // Only look for this specific WAL
 +      replScanner.setRange(Range.exact(wal));
 +
 +      return Iterables.concat(metaScanner, replScanner);
 +    } catch (ReplicationTableOfflineException e) {
 +      // do nothing
 +    }
 +
 +    return metaScanner;
 +  }
 +
-   private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+   @VisibleForTesting
+   int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
      return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 5801faa,03f5c96..bc9fca3
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@@ -22,58 -23,46 +23,75 @@@ import static org.easymock.EasyMock.rep
  
  import java.io.FileNotFoundException;
  import java.io.IOException;
- import java.util.ArrayList;
 -
 +import java.util.Collections;
- import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
+ import java.util.ArrayList;
+ import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
- import java.util.Map.Entry;
  import java.util.UUID;
  
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
- import org.apache.accumulo.core.client.mock.MockInstance;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
- import org.apache.accumulo.core.gc.thrift.GCStatus;
- import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.conf.ServerConfigurationFactory;
  import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.easymock.EasyMock;
 +import org.easymock.IAnswer;
 +import org.junit.Assert;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.gc.thrift.GCStatus;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+ 
  import org.junit.Before;
 +import org.junit.Rule;
  import org.junit.Test;
 +import org.junit.rules.TestName;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
  
+ import org.apache.accumulo.core.client.mock.MockInstance;
+ import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+ import org.apache.accumulo.server.fs.VolumeManagerImpl;
+ import org.apache.zookeeper.KeeperException;
+ 
+ import java.io.File;
+ import java.util.Arrays;
+ import java.util.LinkedHashMap;
+ import java.util.Map.Entry;
+ 
+ import static org.easymock.EasyMock.createMock;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertSame;
+ import static org.junit.Assert.assertTrue;
+ import static java.lang.Thread.sleep;
+ 
+ import java.io.FileOutputStream;
+ 
+ import org.apache.commons.io.FileUtils;
+ 
+ import java.util.concurrent.TimeUnit;
+ 
  public class GarbageCollectWriteAheadLogsTest {
    private static final long BLOCK_SIZE = 64000000L;
  
@@@ -370,198 -261,287 +388,484 @@@
      assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
    }
  
 +  // It was easier to do this than get the mocking working for me
 +  private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
 +
 +    private List<Entry<Key,Value>> replData;
 +
 +    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
 +      super(context, fs, useTrash);
 +      this.replData = replData;
 +    }
 +
 +    @Override
 +    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
 +      return this.replData;
 +    }
 +  }
 +
 +  @Test
 +  public void replicationEntriesAffectGC() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +    Connector conn = createMock(Connector.class);
 +
 +    // Write a Status record which should prevent file1 from being deleted
 +    LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 +
 +    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
 +
 +    replay(conn);
 +
 +    // Open (not-closed) file must be retained
 +    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
 +
 +    // No replication data, not needed
 +    replData.clear();
 +    assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
 +
 +    // The file is closed but not replicated, must be retained
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
 +    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
 +
 +    // File is closed and fully replicated, can be deleted
 +    replData.clear();
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
 +        ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
 +    assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
 +  }
 +
 +  @Test
 +  public void removeReplicationEntries() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    long file1CreateTime = System.currentTimeMillis();
 +    long file2CreateTime = file1CreateTime + 50;
 +    BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
 +    Mutation m = new Mutation("/wals/" + file1);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
 +    bw.addMutation(m);
 +    m = new Mutation("/wals/" + file2);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
 +    bw.addMutation(m);
 +
 +    // These WALs are potential candidates for deletion from fs
 +    Map<String,Path> nameToFileMap = new HashMap<>();
 +    nameToFileMap.put(file1, new Path("/wals/" + file1));
 +    nameToFileMap.put(file2, new Path("/wals/" + file2));
 +
 +    Map<String,Path> sortedWALogs = Collections.emptyMap();
 +
 +    // Make the GCStatus and GcCycleStats
 +    GCStatus status = new GCStatus();
 +    GcCycleStats cycleStats = new GcCycleStats();
 +    status.currentLog = cycleStats;
 +
 +    // We should iterate over two entries
 +    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
 +
 +    // We should have noted that two files were still in use
 +    Assert.assertEquals(2l, cycleStats.inUse);
 +
 +    // Both should have been deleted
 +    Assert.assertEquals(0, nameToFileMap.size());
 +  }
 +
 +  @Test
 +  public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +
 +    Connector conn = context.getConnector();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    long file1CreateTime = System.currentTimeMillis();
 +    long file2CreateTime = file1CreateTime + 50;
 +    // Write some records to the metadata table, we haven't yet written status records to the replication table
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
 +    bw.addMutation(m);
 +
 +    m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
 +    bw.addMutation(m);
 +
 +    // These WALs are potential candidates for deletion from fs
 +    Map<String,Path> nameToFileMap = new HashMap<>();
 +    nameToFileMap.put(file1, new Path("/wals/" + file1));
 +    nameToFileMap.put(file2, new Path("/wals/" + file2));
 +
 +    Map<String,Path> sortedWALogs = Collections.emptyMap();
 +
 +    // Make the GCStatus and GcCycleStats objects
 +    GCStatus status = new GCStatus();
 +    GcCycleStats cycleStats = new GcCycleStats();
 +    status.currentLog = cycleStats;
 +
 +    // We should iterate over two entries
 +    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
 +
 +    // We should have noted that two files were still in use
 +    Assert.assertEquals(2l, cycleStats.inUse);
 +
 +    // Both should have been deleted
 +    Assert.assertEquals(0, nameToFileMap.size());
 +  }
 +
 +  @Test
 +  public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +    Connector conn = context.getConnector();
 +
 +    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(data);
 +
 +    Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
 +  }
 +
 +  @Test
 +  public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +    Connector conn = context.getConnector();
 +
 +    long walCreateTime = System.currentTimeMillis();
 +    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    bw = ReplicationTable.getBatchWriter(conn);
 +    m = new Mutation(wal);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
 +    Map<Key,Value> data = new HashMap<>();
 +    for (Entry<Key,Value> e : iter) {
 +      data.put(e.getKey(), e.getValue());
 +    }
 +
 +    Assert.assertEquals(2, data.size());
 +
 +    // Should get one element from each table (metadata and replication)
 +    for (Key k : data.keySet()) {
 +      String row = k.getRow().toString();
 +      if (row.startsWith(ReplicationSection.getRowPrefix())) {
 +        Assert.assertTrue(row.endsWith(wal));
 +      } else {
 +        Assert.assertEquals(wal, row);
 +      }
 +    }
 +  }
++
+   @Test
+   public void testTimeToDeleteTrue() throws InterruptedException {
+     HostAndPort address = HostAndPort.fromString("tserver1:9998");
+     long wait = AccumuloConfiguration.getTimeInMillis("1s");
+     gcwal.clearFirstSeenDead();
+     assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+     sleep(wait * 2);
+     assertTrue(gcwal.timeToDelete(address, wait));
+   }
+ 
+   @Test
+   public void testTimeToDeleteFalse() {
+     HostAndPort address = HostAndPort.fromString("tserver1:9998");
+     long wait = AccumuloConfiguration.getTimeInMillis("1h");
+     long t1, t2;
+     boolean ttd;
+     do {
+       t1 = System.nanoTime();
+       gcwal.clearFirstSeenDead();
+       assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+       ttd = gcwal.timeToDelete(address, wait);
+       t2 = System.nanoTime();
+     } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait
+ 
+     assertFalse(ttd);
+   }
+ 
+   @Test
+   public void testTimeToDeleteWithNullAddress() {
+     assertFalse(gcwal.timeToDelete(null, 123l));
+   }
+ 
+   /**
+    * Wrapper class with some helper methods
+    * <p>
+    * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner.
+    */
+   class MethodCalls {
+ 
+     private LinkedHashMap<String,List<Object>> mapWrapper;
+ 
+     public MethodCalls() {
+       mapWrapper = new LinkedHashMap<String,List<Object>>();
+     }
+ 
+     public void put(String methodName, Object... args) {
+       mapWrapper.put(methodName, Arrays.asList(args));
+     }
+ 
+     public int size() {
+       return mapWrapper.size();
+     }
+ 
+     public boolean hasOneEntry() {
+       return size() == 1;
+     }
+ 
+     public Map.Entry<String,List<Object>> getFirstEntry() {
+       return mapWrapper.entrySet().iterator().next();
+     }
+ 
+     public String getFirstEntryMethod() {
+       return getFirstEntry().getKey();
+     }
+ 
+     public List<Object> getFirstEntryArgs() {
+       return getFirstEntry().getValue();
+     }
+ 
+     public Object getFirstEntryArg(int number) {
+       return getFirstEntryArgs().get(number);
+     }
+   }
+ 
+   /**
+    * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method
+    * <p>
+    * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method
+    */
+   class GCWALPartialMock extends GarbageCollectWriteAheadLogs {
+ 
+     private boolean holdsLockBool = false;
+ 
 -    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
 -      super(i, vm, useTrash);
++    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
++      super(ctx, vm, useTrash);
+       this.holdsLockBool = holdLock;
+     }
+ 
+     public MethodCalls methodCalls = new MethodCalls();
+ 
+     @Override
+     boolean holdsLock(HostAndPort addr) {
+       return holdsLockBool;
+     }
+ 
+     @Override
+     void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("removeWALFromDownTserver", address, conf, entry, status);
+     }
+ 
+     @Override
+     void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status);
+     }
+ 
+     @Override
+     void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("removeOldStyleWAL", entry, status);
+     }
+ 
+     @Override
+     void removeSortedWAL(Path swalog) {
+       methodCalls.put("removeSortedWAL", swalog);
+     }
+   }
+ 
+   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
 -    return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked);
++    AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
++    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+   }
+ 
+   private Map<String,Path> getEmptyMap() {
+     return new HashMap<String,Path>();
+   }
+ 
+   private Map<String,ArrayList<Path>> getServerToFileMap1(String key, Path singlePath) {
+     Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+     serverToFileMap.put(key, new ArrayList<Path>(Arrays.asList(singlePath)));
+     return serverToFileMap;
+   }
+ 
+   @Test
+   public void testRemoveFilesWithOldStyle() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1("", p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod());
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0));
+     assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1));
+   }
+ 
+   @Test
+   public void testRemoveFilesWithDeadTservers() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false);
+     String server = "tserver1+9997";
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod());
+     assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+     assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+     assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+   }
+ 
+   @Test
+   public void testRemoveFilesWithLiveTservers() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     String server = "tserver1+9997";
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod());
+     assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+     assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+     assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+   }
+ 
+   @Test
+   public void testRemoveFilesRemovesSortedWALs() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+     Map<String,Path> sortedWALogs = new HashMap<String,Path>();
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+     sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status);
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod());
+     assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0));
+ 
+   }
+ 
+   static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver";
+   static String GCWAL_DEAD_TSERVER = "tserver1";
+   static String GCWAL_DEAD_TSERVER_PORT = "9995";
+   static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString();
+ 
+   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
+ 
 -    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException {
 -      super(i, vm, useTrash);
++    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
++      super(ctx, vm, useTrash);
+     }
+ 
+     @Override
+     boolean holdsLock(HostAndPort addr) {
+       // tries use zookeeper
+       return false;
+     }
+ 
+     @Override
+     Map<String,Path> getSortedWALogs() {
+       return new HashMap<String,Path>();
+     }
+ 
+     @Override
+     int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+       String sep = File.separator;
+       Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep
+           + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+       fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT);
+       nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p);
+       return 1;
+     }
+ 
+     @Override
+     int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+         InterruptedException {
+       return 0;
+     }
+ 
+     long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+       // tries to use zookeeper
+       return 1000l;
+     }
+   }
+ 
+   @Test
+   public void testCollectWithDeadTserver() throws IOException, InterruptedException {
+     Instance i = new MockInstance();
++    AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(i));
+     File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR);
+     File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT);
+     File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+     if (!walFileDir.exists()) {
 -      walFileDir.mkdirs();
++      assertTrue("Directory was made", walFileDir.mkdirs());
+       new FileOutputStream(walFile).close();
+     }
+ 
+     try {
+       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
 -      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false);
++      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
+ 
+       gcwal2.collect(status);
+ 
+       assertTrue("File should not be deleted", walFile.exists());
+       assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+       assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted());
+ 
+       sleep(2000);
+       gcwal2.collect(status);
+ 
+       assertFalse("File should be gone", walFile.exists());
+       assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+       assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted());
+ 
+     } finally {
+       if (walDir.exists()) {
+         FileUtils.deleteDirectory(walDir);
+       }
+     }
+   }
  }