You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mj...@apache.org on 2016/06/08 13:32:02 UTC

[2/4] accumulo git commit: Merge branch '1.6' into 1.7

Merge branch '1.6' into 1.7

Adds commit for ACCUMULO-4157 to fix bug where WALs were deleted too quickly
for "Dead" Tservers


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/5f02d564
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/5f02d564
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/5f02d564

Branch: refs/heads/master
Commit: 5f02d564ec3dae626edb7091fc1a92f5fd760f97
Parents: 0eab0ec e0426c5
Author: Michael Wall <mj...@apache.org>
Authored: Wed Jun 8 08:34:26 2016 -0400
Committer: Michael Wall <mj...@apache.org>
Committed: Wed Jun 8 08:34:26 2016 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |   2 +
 .../apache/accumulo/core/conf/PropertyTest.java |   5 +
 .../gc/GarbageCollectWriteAheadLogs.java        | 296 +++++++++++++----
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 332 ++++++++++++++++++-
 4 files changed, 564 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --cc core/src/main/java/org/apache/accumulo/core/conf/Property.java
index dbb2036,5fff17f..c427610
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@@ -364,7 -305,8 +364,9 @@@ public enum Property 
    GC_DELETE_THREADS("gc.threads.delete", "16", PropertyType.COUNT, "The number of threads used to delete files"),
    GC_TRASH_IGNORE("gc.trash.ignore", "false", PropertyType.BOOLEAN, "Do not use the Trash, even if it is configured"),
    GC_FILE_ARCHIVE("gc.file.archive", "false", PropertyType.BOOLEAN, "Archive any files/directories instead of moving to the HDFS trash or deleting"),
 +  GC_TRACE_PERCENT("gc.trace.percent", "0.01", PropertyType.FRACTION, "Percent of gc cycles to trace"),
+   GC_WAL_DEAD_SERVER_WAIT("gc.wal.dead.server.wait", "1h", PropertyType.TIMEDURATION,
+       "Time to wait after a tserver is first seen as dead before removing associated WAL files"),
  
    // properties that are specific to the monitor server behavior
    MONITOR_PREFIX("monitor.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the monitor web server."),

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --cc server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 1735c0d,b7d8d92..a62ffb2
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@@ -51,34 -37,35 +52,39 @@@ import org.apache.accumulo.core.securit
  import org.apache.accumulo.core.tabletserver.log.LogEntry;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
  import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 +import org.apache.accumulo.core.trace.Span;
 +import org.apache.accumulo.core.trace.Trace;
 +import org.apache.accumulo.core.trace.Tracer;
  import org.apache.accumulo.core.util.AddressUtil;
 -import org.apache.accumulo.core.util.ThriftUtil;
  import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.server.AccumuloServerContext;
  import org.apache.accumulo.server.ServerConstants;
 -import org.apache.accumulo.server.conf.ServerConfiguration;
  import org.apache.accumulo.server.fs.VolumeManager;
 -import org.apache.accumulo.server.security.SystemCredentials;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
  import org.apache.accumulo.server.util.MetadataTableUtil;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 -import org.apache.accumulo.trace.instrument.Span;
 -import org.apache.accumulo.trace.instrument.Trace;
 -import org.apache.accumulo.trace.instrument.Tracer;
  import org.apache.hadoop.fs.FileStatus;
  import org.apache.hadoop.fs.Path;
 -import org.apache.log4j.Logger;
  import org.apache.thrift.TException;
  import org.apache.zookeeper.KeeperException;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
  
 +import com.google.common.collect.Iterables;
  import com.google.common.net.HostAndPort;
 +import com.google.protobuf.InvalidProtocolBufferException;
+ import java.util.concurrent.TimeUnit;
++import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.conf.Property;
  
  public class GarbageCollectWriteAheadLogs {
 -  private static final Logger log = Logger.getLogger(GarbageCollectWriteAheadLogs.class);
 +  private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
  
 -  private final Instance instance;
 +  private final AccumuloServerContext context;
    private final VolumeManager fs;
+   private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+   private AccumuloConfiguration config;
  
    private boolean useTrash;
  
@@@ -201,75 -184,202 +216,202 @@@
      }
    }
  
-   private int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+   private AccumuloConfiguration getConfig() {
 -    return ServerConfiguration.getSystemConfiguration(instance);
++    return context.getServerConfigurationFactory().getConfiguration();
+   }
+ 
+   /**
+    * Top level method for removing WAL files.
+    * <p>
+    * Loops over all the gathered WAL and sortedWAL entries and calls the appropriate methods for removal
+    *
+    * @param nameToFileMap
+    *          Map of filename to Path
+    * @param serverToFileMap
+    *          Map of HostAndPort string to a list of Paths
+    * @param sortedWALogs
+    *          Map of sorted WAL names to Path
+    * @param status
+    *          GCStatus object for tracking what is done
+    * @return 0 always
+    */
+   @VisibleForTesting
+   int removeFiles(Map<String,Path> nameToFileMap, Map<String,ArrayList<Path>> serverToFileMap, Map<String,Path> sortedWALogs, final GCStatus status) {
+     // TODO: remove nameToFileMap from method signature, not used here I don't think
+     AccumuloConfiguration conf = getConfig();
      for (Entry<String,ArrayList<Path>> entry : serverToFileMap.entrySet()) {
        if (entry.getKey().isEmpty()) {
-         // old-style log entry, just remove it
-         for (Path path : entry.getValue()) {
-           log.debug("Removing old-style WAL " + path);
-           try {
-             if (!useTrash || !fs.moveToTrash(path))
-               fs.deleteRecursively(path);
-             status.currentLog.deleted++;
-           } catch (FileNotFoundException ex) {
-             // ignored
-           } catch (IOException ex) {
-             log.error("Unable to delete wal " + path + ": " + ex);
-           }
-         }
+         removeOldStyleWAL(entry, status);
        } else {
-         HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
-         if (!holdsLock(address)) {
-           for (Path path : entry.getValue()) {
-             log.debug("Removing WAL for offline server " + path);
-             try {
-               if (!useTrash || !fs.moveToTrash(path))
-                 fs.deleteRecursively(path);
-               status.currentLog.deleted++;
-             } catch (FileNotFoundException ex) {
-               // ignored
-             } catch (IOException ex) {
-               log.error("Unable to delete wal " + path + ": " + ex);
-             }
-           }
-           continue;
-         } else {
-           Client tserver = null;
-           try {
-             tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
-             tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
-             log.debug("deleted " + entry.getValue() + " from " + entry.getKey());
-             status.currentLog.deleted += entry.getValue().size();
-           } catch (TException e) {
-             log.warn("Error talking to " + address + ": " + e);
-           } finally {
-             if (tserver != null)
-               ThriftUtil.returnClient(tserver);
-           }
-         }
+         removeWALFile(entry, conf, status);
        }
      }
- 
      for (Path swalog : sortedWALogs.values()) {
-       log.debug("Removing sorted WAL " + swalog);
+       removeSortedWAL(swalog);
+     }
+     return 0;
+   }
+ 
+   /**
+    * Removes sortedWALs.
+    * <p>
+    * Sorted WALs are WALs that are in the recovery directory and have already been used.
+    *
+    * @param swalog
+    *          Path to the WAL
+    */
+   @VisibleForTesting
+   void removeSortedWAL(Path swalog) {
+     log.debug("Removing sorted WAL " + swalog);
+     try {
+       if (!useTrash || !fs.moveToTrash(swalog)) {
+         fs.deleteRecursively(swalog);
+       }
+     } catch (FileNotFoundException ex) {
+       // ignored
+     } catch (IOException ioe) {
        try {
-         if (!useTrash || !fs.moveToTrash(swalog)) {
-           fs.deleteRecursively(swalog);
+         if (fs.exists(swalog)) {
+           log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
          }
-       } catch (FileNotFoundException ex) {
-         // ignored
-       } catch (IOException ioe) {
+       } catch (IOException ex) {
+         log.error("Unable to check for the existence of " + swalog, ex);
+       }
+     }
+   }
+ 
+   /**
+    * A wrapper method to check if the tserver using the WAL is still alive
+    * <p>
+    * Delegates to the deletion to #removeWALfromDownTserver if the ZK lock is gone or #askTserverToRemoveWAL if the server is known to still be alive
+    *
+    * @param entry
+    *          WAL information gathered
+    * @param conf
+    *          AccumuloConfiguration object
+    * @param status
+    *          GCStatus object
+    */
+   void removeWALFile(Entry<String,ArrayList<Path>> entry, AccumuloConfiguration conf, final GCStatus status) {
+     HostAndPort address = AddressUtil.parseAddress(entry.getKey(), false);
+     if (!holdsLock(address)) {
+       removeWALfromDownTserver(address, conf, entry, status);
+     } else {
+       askTserverToRemoveWAL(address, conf, entry, status);
+     }
+   }
+ 
+   /**
+    * Asks a currently running tserver to remove it's WALs.
+    * <p>
+    * A tserver has more information about whether a WAL is still being used for current mutations. It is safer to ask the tserver to remove the file instead of
+    * just relying on information in the metadata table.
+    *
+    * @param address
+    *          HostAndPort of the tserver
+    * @param conf
+    *          AccumuloConfiguration entry
+    * @param entry
+    *          WAL information gathered
+    * @param status
+    *          GCStatus object
+    */
+   @VisibleForTesting
+   void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     firstSeenDead.remove(address);
+     Client tserver = null;
+     try {
 -      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, conf);
 -      tserver.removeLogs(Tracer.traceInfo(), SystemCredentials.get().toThrift(instance), paths2strings(entry.getValue()));
++      tserver = ThriftUtil.getClient(new TabletClientService.Client.Factory(), address, context);
++      tserver.removeLogs(Tracer.traceInfo(), context.rpcCreds(), paths2strings(entry.getValue()));
+       log.debug("asked tserver to delete " + entry.getValue() + " from " + entry.getKey());
+       status.currentLog.deleted += entry.getValue().size();
+     } catch (TException e) {
+       log.warn("Error talking to " + address + ": " + e);
+     } finally {
+       if (tserver != null)
+         ThriftUtil.returnClient(tserver);
+     }
+   }
+ 
+   /**
+    * Get the configured wait period a server has to be dead.
+    * <p>
+    * The property is "gc.wal.dead.server.wait" defined in Property.GC_WAL_DEAD_SERVER_WAIT and is duration. Valid values include a unit with no space like
+    * 3600s, 5m or 2h.
+    *
+    * @param conf
+    *          AccumuloConfiguration
+    * @return long that represents the millis to wait
+    */
+   @VisibleForTesting
+   long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+     return conf.getTimeInMillis(Property.GC_WAL_DEAD_SERVER_WAIT);
+   }
+ 
+   /**
+    * Remove walogs associated with a tserver that no longer has a look.
+    * <p>
+    * There is configuration option, see #getGCWALDeadServerWaitTime, that defines how long a server must be "dead" before removing the associated write ahead
+    * log files. The intent to ensure that recovery succeeds for the tablet that were host on that tserver.
+    *
+    * @param address
+    *          HostAndPort of the tserver with no lock
+    * @param conf
+    *          AccumuloConfiguration to get that gc.wal.dead.server.wait info
+    * @param entry
+    *          The WALOG path
+    * @param status
+    *          GCStatus for tracking changes
+    */
+   @VisibleForTesting
+   void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     // tserver is down, only delete once configured time has passed
+     if (timeToDelete(address, getGCWALDeadServerWaitTime(conf))) {
+       for (Path path : entry.getValue()) {
+         log.debug("Removing WAL for offline server " + address + " at " + path);
          try {
-           if (fs.exists(swalog)) {
-             log.error("Unable to delete sorted walog " + swalog + ": " + ioe);
+           if (!useTrash || !fs.moveToTrash(path)) {
+             fs.deleteRecursively(path);
            }
+           status.currentLog.deleted++;
+         } catch (FileNotFoundException ex) {
+           // ignored
          } catch (IOException ex) {
-           log.error("Unable to check for the existence of " + swalog, ex);
+           log.error("Unable to delete wal " + path + ": " + ex);
          }
        }
+       firstSeenDead.remove(address);
+     } else {
+       log.debug("Not removing " + entry.getValue().size() + " WAL(s) for offline server since it has not be long enough: " + address);
      }
+   }
  
-     return 0;
+   /**
+    * Removes old style WAL entries.
+    * <p>
+    * The format for storing WAL info in the metadata table changed at some point, maybe the 1.5 release. Once that is known for sure and we no longer support
+    * upgrading from that version, this code should be removed
+    *
+    * @param entry
+    *          Map of empty server address to List of Paths
+    * @param status
+    *          GCStatus object
+    */
+   @VisibleForTesting
+   void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+     // old-style log entry, just remove it
+     for (Path path : entry.getValue()) {
+       log.debug("Removing old-style WAL " + path);
+       try {
+         if (!useTrash || !fs.moveToTrash(path))
+           fs.deleteRecursively(path);
+         status.currentLog.deleted++;
+       } catch (FileNotFoundException ex) {
+         // ignored
+       } catch (IOException ex) {
+         log.error("Unable to delete wal " + path + ": " + ex);
+       }
+     }
    }
  
    /**
@@@ -311,14 -421,13 +453,15 @@@
      return result;
    }
  
-   protected int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+   @VisibleForTesting
+   int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
        InterruptedException {
      int count = 0;
 -    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(SystemCredentials.get());
 +    Iterator<LogEntry> iterator = MetadataTableUtil.getLogEntries(context);
  
 +    // For each WAL reference in the metadata table
      while (iterator.hasNext()) {
 +      // Each metadata reference has at least one WAL file
        for (String entry : iterator.next().logSet) {
          // old style WALs will have the IP:Port of their logger and new style will either be a Path either absolute or relative, in all cases
          // the last "/" will mark a UUID file name.
@@@ -341,101 -448,8 +484,102 @@@
      return count;
    }
  
 +  protected int removeReplicationEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
 +      InterruptedException {
 +    Connector conn;
 +    try {
 +      conn = context.getConnector();
 +    } catch (AccumuloException | AccumuloSecurityException e) {
 +      log.error("Failed to get connector", e);
 +      throw new IllegalArgumentException(e);
 +    }
 +
 +    int count = 0;
 +
 +    Iterator<Entry<String,Path>> walIter = nameToFileMap.entrySet().iterator();
 +
 +    while (walIter.hasNext()) {
 +      Entry<String,Path> wal = walIter.next();
 +      String fullPath = wal.getValue().toString();
 +      if (neededByReplication(conn, fullPath)) {
 +        log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
 +        // If we haven't already removed it, check to see if this WAL is
 +        // "in use" by replication (needed for replication purposes)
 +        status.currentLog.inUse++;
 +
 +        walIter.remove();
 +        sortedWALogs.remove(wal.getKey());
 +      } else {
 +        log.debug("WAL not needed for replication {}", fullPath);
 +      }
 +      count++;
 +    }
 +
 +    return count;
 +  }
 +
 +  /**
 +   * Determine if the given WAL is needed for replication
 +   *
 +   * @param wal
 +   *          The full path (URI)
 +   * @return True if the WAL is still needed by replication (not a candidate for deletion)
 +   */
 +  protected boolean neededByReplication(Connector conn, String wal) {
 +    log.info("Checking replication table for " + wal);
 +
 +    Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
 +
 +    // TODO Push down this filter to the tserver to only return records
 +    // that are not completely replicated and convert this loop into a
 +    // `return s.iterator.hasNext()` statement
 +    for (Entry<Key,Value> entry : iter) {
 +      try {
 +        Status status = Status.parseFrom(entry.getValue().get());
 +        log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
 +        if (!StatusUtil.isSafeForRemoval(status)) {
 +          return true;
 +        }
 +      } catch (InvalidProtocolBufferException e) {
 +        log.error("Could not deserialize Status protobuf for " + entry.getKey(), e);
 +      }
 +    }
 +
 +    return false;
 +  }
 +
 +  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
 +    Scanner metaScanner;
 +    try {
 +      metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
 +    } catch (TableNotFoundException e) {
 +      throw new RuntimeException(e);
 +    }
 +
 +    // Need to add in the replication section prefix
 +    metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
 +    // Limit the column family to be sure
 +    metaScanner.fetchColumnFamily(ReplicationSection.COLF);
 +
 +    try {
 +      Scanner replScanner = ReplicationTable.getScanner(conn);
 +
 +      // Scan only the Status records
 +      StatusSection.limit(replScanner);
 +
 +      // Only look for this specific WAL
 +      replScanner.setRange(Range.exact(wal));
 +
 +      return Iterables.concat(metaScanner, replScanner);
 +    } catch (ReplicationTableOfflineException e) {
 +      // do nothing
 +    }
 +
 +    return metaScanner;
 +  }
 +
-   private int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+   @VisibleForTesting
+   int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
      return scanServers(ServerConstants.getWalDirs(), fileToServerMap, nameToFileMap);
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/5f02d564/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --cc server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 5801faa,03f5c96..bc9fca3
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@@ -22,58 -23,46 +23,75 @@@ import static org.easymock.EasyMock.rep
  
  import java.io.FileNotFoundException;
  import java.io.IOException;
- import java.util.ArrayList;
 -
 +import java.util.Collections;
- import java.util.HashMap;
 +import java.util.Iterator;
 +import java.util.LinkedList;
+ import java.util.ArrayList;
+ import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
- import java.util.Map.Entry;
  import java.util.UUID;
  
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
  import org.apache.accumulo.core.client.Instance;
- import org.apache.accumulo.core.client.mock.MockInstance;
- import org.apache.accumulo.core.conf.AccumuloConfiguration;
 +import org.apache.accumulo.core.conf.ConfigurationCopy;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.conf.SiteConfiguration;
 +import org.apache.accumulo.core.data.Key;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.Value;
- import org.apache.accumulo.core.gc.thrift.GCStatus;
- import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 +import org.apache.accumulo.core.metadata.MetadataTable;
 +import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 +import org.apache.accumulo.core.protobuf.ProtobufUtil;
 +import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 +import org.apache.accumulo.core.replication.ReplicationTable;
 +import org.apache.accumulo.server.AccumuloServerContext;
 +import org.apache.accumulo.server.conf.ServerConfigurationFactory;
  import org.apache.accumulo.server.fs.VolumeManager;
 +import org.apache.accumulo.server.replication.StatusUtil;
 +import org.apache.accumulo.server.replication.proto.Replication.Status;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.io.Text;
 +import org.easymock.EasyMock;
 +import org.easymock.IAnswer;
 +import org.junit.Assert;
+ import org.apache.accumulo.core.conf.AccumuloConfiguration;
+ import org.apache.accumulo.core.gc.thrift.GCStatus;
+ import org.apache.hadoop.fs.FileStatus;
+ import org.apache.hadoop.fs.Path;
+ 
  import org.junit.Before;
 +import org.junit.Rule;
  import org.junit.Test;
 +import org.junit.rules.TestName;
 +
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Maps;
  
+ import org.apache.accumulo.core.client.mock.MockInstance;
+ import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+ import org.apache.accumulo.server.fs.VolumeManagerImpl;
+ import org.apache.zookeeper.KeeperException;
+ 
+ import java.io.File;
+ import java.util.Arrays;
+ import java.util.LinkedHashMap;
+ import java.util.Map.Entry;
+ 
+ import static org.easymock.EasyMock.createMock;
+ import static org.junit.Assert.assertEquals;
+ import static org.junit.Assert.assertFalse;
+ import static org.junit.Assert.assertSame;
+ import static org.junit.Assert.assertTrue;
+ import static java.lang.Thread.sleep;
+ 
+ import java.io.FileOutputStream;
+ 
+ import org.apache.commons.io.FileUtils;
+ 
+ import java.util.concurrent.TimeUnit;
+ 
  public class GarbageCollectWriteAheadLogsTest {
    private static final long BLOCK_SIZE = 64000000L;
  
@@@ -370,198 -261,287 +388,484 @@@
      assertFalse(GarbageCollectWriteAheadLogs.isUUID(null));
    }
  
 +  // It was easier to do this than get the mocking working for me
 +  private static class ReplicationGCWAL extends GarbageCollectWriteAheadLogs {
 +
 +    private List<Entry<Key,Value>> replData;
 +
 +    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
 +      super(context, fs, useTrash);
 +      this.replData = replData;
 +    }
 +
 +    @Override
 +    protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, String wal) {
 +      return this.replData;
 +    }
 +  }
 +
 +  @Test
 +  public void replicationEntriesAffectGC() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +    Connector conn = createMock(Connector.class);
 +
 +    // Write a Status record which should prevent file1 from being deleted
 +    LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 +
 +    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
 +
 +    replay(conn);
 +
 +    // Open (not-closed) file must be retained
 +    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
 +
 +    // No replication data, not needed
 +    replData.clear();
 +    assertFalse(replGC.neededByReplication(conn, "/wals/" + file2));
 +
 +    // The file is closed but not replicated, must be retained
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileClosedValue()));
 +    assertTrue(replGC.neededByReplication(conn, "/wals/" + file1));
 +
 +    // File is closed and fully replicated, can be deleted
 +    replData.clear();
 +    replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"),
 +        ProtobufUtil.toValue(Status.newBuilder().setInfiniteEnd(true).setBegin(Long.MAX_VALUE).setClosed(true).build())));
 +    assertFalse(replGC.neededByReplication(conn, "/wals/" + file1));
 +  }
 +
 +  @Test
 +  public void removeReplicationEntries() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    long file1CreateTime = System.currentTimeMillis();
 +    long file2CreateTime = file1CreateTime + 50;
 +    BatchWriter bw = ReplicationTable.getBatchWriter(context.getConnector());
 +    Mutation m = new Mutation("/wals/" + file1);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
 +    bw.addMutation(m);
 +    m = new Mutation("/wals/" + file2);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
 +    bw.addMutation(m);
 +
 +    // These WALs are potential candidates for deletion from fs
 +    Map<String,Path> nameToFileMap = new HashMap<>();
 +    nameToFileMap.put(file1, new Path("/wals/" + file1));
 +    nameToFileMap.put(file2, new Path("/wals/" + file2));
 +
 +    Map<String,Path> sortedWALogs = Collections.emptyMap();
 +
 +    // Make the GCStatus and GcCycleStats
 +    GCStatus status = new GCStatus();
 +    GcCycleStats cycleStats = new GcCycleStats();
 +    status.currentLog = cycleStats;
 +
 +    // We should iterate over two entries
 +    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
 +
 +    // We should have noted that two files were still in use
 +    Assert.assertEquals(2l, cycleStats.inUse);
 +
 +    // Both should have been deleted
 +    Assert.assertEquals(0, nameToFileMap.size());
 +  }
 +
 +  @Test
 +  public void replicationEntriesOnlyInMetaPreventGC() throws Exception {
 +    String file1 = UUID.randomUUID().toString(), file2 = UUID.randomUUID().toString();
 +
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +
 +    Connector conn = context.getConnector();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    long file1CreateTime = System.currentTimeMillis();
 +    long file2CreateTime = file1CreateTime + 50;
 +    // Write some records to the metadata table, we haven't yet written status records to the replication table
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file1);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file1CreateTime));
 +    bw.addMutation(m);
 +
 +    m = new Mutation(ReplicationSection.getRowPrefix() + "/wals/" + file2);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(file2CreateTime));
 +    bw.addMutation(m);
 +
 +    // These WALs are potential candidates for deletion from fs
 +    Map<String,Path> nameToFileMap = new HashMap<>();
 +    nameToFileMap.put(file1, new Path("/wals/" + file1));
 +    nameToFileMap.put(file2, new Path("/wals/" + file2));
 +
 +    Map<String,Path> sortedWALogs = Collections.emptyMap();
 +
 +    // Make the GCStatus and GcCycleStats objects
 +    GCStatus status = new GCStatus();
 +    GcCycleStats cycleStats = new GcCycleStats();
 +    status.currentLog = cycleStats;
 +
 +    // We should iterate over two entries
 +    Assert.assertEquals(2, gcWALs.removeReplicationEntries(nameToFileMap, sortedWALogs, status));
 +
 +    // We should have noted that two files were still in use
 +    Assert.assertEquals(2l, cycleStats.inUse);
 +
 +    // Both should have been deleted
 +    Assert.assertEquals(0, nameToFileMap.size());
 +  }
 +
 +  @Test
 +  public void noReplicationTableDoesntLimitMetatdataResults() throws Exception {
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +    Connector conn = context.getConnector();
 +
 +    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(System.currentTimeMillis()));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
 +    Entry<Key,Value> entry = Iterables.getOnlyElement(data);
 +
 +    Assert.assertEquals(ReplicationSection.getRowPrefix() + wal, entry.getKey().getRow().toString());
 +  }
 +
 +  @Test
 +  public void fetchesReplicationEntriesFromMetadataAndReplicationTables() throws Exception {
 +    Instance inst = new MockInstance(testName.getMethodName());
 +    AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 +    Connector conn = context.getConnector();
 +
 +    long walCreateTime = System.currentTimeMillis();
 +    String wal = "hdfs://localhost:8020/accumulo/wal/tserver+port/123456-1234-1234-12345678";
 +    BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, new BatchWriterConfig());
 +    Mutation m = new Mutation(ReplicationSection.getRowPrefix() + wal);
 +    m.put(ReplicationSection.COLF, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    bw = ReplicationTable.getBatchWriter(conn);
 +    m = new Mutation(wal);
 +    StatusSection.add(m, new Text("1"), StatusUtil.fileCreatedValue(walCreateTime));
 +    bw.addMutation(m);
 +    bw.close();
 +
 +    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
 +
 +    Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
 +    Map<Key,Value> data = new HashMap<>();
 +    for (Entry<Key,Value> e : iter) {
 +      data.put(e.getKey(), e.getValue());
 +    }
 +
 +    Assert.assertEquals(2, data.size());
 +
 +    // Should get one element from each table (metadata and replication)
 +    for (Key k : data.keySet()) {
 +      String row = k.getRow().toString();
 +      if (row.startsWith(ReplicationSection.getRowPrefix())) {
 +        Assert.assertTrue(row.endsWith(wal));
 +      } else {
 +        Assert.assertEquals(wal, row);
 +      }
 +    }
 +  }
++
+   @Test
+   public void testTimeToDeleteTrue() throws InterruptedException {
+     HostAndPort address = HostAndPort.fromString("tserver1:9998");
+     long wait = AccumuloConfiguration.getTimeInMillis("1s");
+     gcwal.clearFirstSeenDead();
+     assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+     sleep(wait * 2);
+     assertTrue(gcwal.timeToDelete(address, wait));
+   }
+ 
+   @Test
+   public void testTimeToDeleteFalse() {
+     HostAndPort address = HostAndPort.fromString("tserver1:9998");
+     long wait = AccumuloConfiguration.getTimeInMillis("1h");
+     long t1, t2;
+     boolean ttd;
+     do {
+       t1 = System.nanoTime();
+       gcwal.clearFirstSeenDead();
+       assertFalse("First call should be false and should store the first seen time", gcwal.timeToDelete(address, wait));
+       ttd = gcwal.timeToDelete(address, wait);
+       t2 = System.nanoTime();
+     } while (TimeUnit.NANOSECONDS.toMillis(t2 - t1) > (wait / 2)); // as long as it took less than half of the configured wait
+ 
+     assertFalse(ttd);
+   }
+ 
+   @Test
+   public void testTimeToDeleteWithNullAddress() {
+     assertFalse(gcwal.timeToDelete(null, 123l));
+   }
+ 
+   /**
+    * Wrapper class with some helper methods
+    * <p>
+    * Just a wrapper around a LinkedHashMap that store method name and argument information. Also includes some convenience methods to make usage cleaner.
+    */
+   class MethodCalls {
+ 
+     private LinkedHashMap<String,List<Object>> mapWrapper;
+ 
+     public MethodCalls() {
+       mapWrapper = new LinkedHashMap<String,List<Object>>();
+     }
+ 
+     public void put(String methodName, Object... args) {
+       mapWrapper.put(methodName, Arrays.asList(args));
+     }
+ 
+     public int size() {
+       return mapWrapper.size();
+     }
+ 
+     public boolean hasOneEntry() {
+       return size() == 1;
+     }
+ 
+     public Map.Entry<String,List<Object>> getFirstEntry() {
+       return mapWrapper.entrySet().iterator().next();
+     }
+ 
+     public String getFirstEntryMethod() {
+       return getFirstEntry().getKey();
+     }
+ 
+     public List<Object> getFirstEntryArgs() {
+       return getFirstEntry().getValue();
+     }
+ 
+     public Object getFirstEntryArg(int number) {
+       return getFirstEntryArgs().get(number);
+     }
+   }
+ 
+   /**
+    * Partial mock of the GarbageCollectWriteAheadLogs for testing the removeFile method
+    * <p>
+    * There is a map named methodCalls that can be used to assert parameters on methods called inside the removeFile method
+    */
+   class GCWALPartialMock extends GarbageCollectWriteAheadLogs {
+ 
+     private boolean holdsLockBool = false;
+ 
 -    public GCWALPartialMock(Instance i, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
 -      super(i, vm, useTrash);
++    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
++      super(ctx, vm, useTrash);
+       this.holdsLockBool = holdLock;
+     }
+ 
+     public MethodCalls methodCalls = new MethodCalls();
+ 
+     @Override
+     boolean holdsLock(HostAndPort addr) {
+       return holdsLockBool;
+     }
+ 
+     @Override
+     void removeWALfromDownTserver(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("removeWALFromDownTserver", address, conf, entry, status);
+     }
+ 
+     @Override
+     void askTserverToRemoveWAL(HostAndPort address, AccumuloConfiguration conf, Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("askTserverToRemoveWAL", address, conf, entry, status);
+     }
+ 
+     @Override
+     void removeOldStyleWAL(Entry<String,ArrayList<Path>> entry, final GCStatus status) {
+       methodCalls.put("removeOldStyleWAL", entry, status);
+     }
+ 
+     @Override
+     void removeSortedWAL(Path swalog) {
+       methodCalls.put("removeSortedWAL", swalog);
+     }
+   }
+ 
+   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
 -    return new GCWALPartialMock(new MockInstance("accumulo"), VolumeManagerImpl.get(), false, locked);
++    AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
++    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+   }
+ 
+   private Map<String,Path> getEmptyMap() {
+     return new HashMap<String,Path>();
+   }
+ 
+   private Map<String,ArrayList<Path>> getServerToFileMap1(String key, Path singlePath) {
+     Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+     serverToFileMap.put(key, new ArrayList<Path>(Arrays.asList(singlePath)));
+     return serverToFileMap;
+   }
+ 
+   @Test
+   public void testRemoveFilesWithOldStyle() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1("", p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeOldStyleWAL", "removeOldStyleWAL", calls.getFirstEntryMethod());
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("First param should be empty", firstServerToFileMap, calls.getFirstEntryArg(0));
+     assertEquals("Second param should be the status", status, calls.getFirstEntryArg(1));
+   }
+ 
+   @Test
+   public void testRemoveFilesWithDeadTservers() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, false);
+     String server = "tserver1+9997";
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeWALfromDownTserver", "removeWALFromDownTserver", calls.getFirstEntryMethod());
+     assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+     assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+     assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+   }
+ 
+   @Test
+   public void testRemoveFilesWithLiveTservers() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     String server = "tserver1+9997";
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/" + server + "/" + UUID.randomUUID().toString());
+     Map<String,ArrayList<Path>> serverToFileMap = getServerToFileMap1(server, p1);
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, getEmptyMap(), status);
+ 
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be askTserverToRemoveWAL", "askTserverToRemoveWAL", calls.getFirstEntryMethod());
+     assertEquals("First param should be address", HostAndPort.fromString(server.replaceAll("[+]", ":")), calls.getFirstEntryArg(0));
+     assertTrue("Second param should be an AccumuloConfiguration", calls.getFirstEntryArg(1) instanceof AccumuloConfiguration);
+     Entry<String,ArrayList<Path>> firstServerToFileMap = serverToFileMap.entrySet().iterator().next();
+     assertEquals("Third param should be the entry", firstServerToFileMap, calls.getFirstEntryArg(2));
+     assertEquals("Forth param should be the status", status, calls.getFirstEntryArg(3));
+   }
+ 
+   @Test
+   public void testRemoveFilesRemovesSortedWALs() throws IOException {
+     GCStatus status = new GCStatus();
+     GarbageCollectWriteAheadLogs realGCWAL = getGCWALForRemoveFileTest(status, true);
+     Map<String,ArrayList<Path>> serverToFileMap = new HashMap<String,ArrayList<Path>>();
+     Map<String,Path> sortedWALogs = new HashMap<String,Path>();
+     Path p1 = new Path("hdfs://localhost:9000/accumulo/wal/tserver1+9997/" + UUID.randomUUID().toString());
+     sortedWALogs.put("junk", p1); // TODO: see if this key is actually used here, maybe can be removed
+ 
+     realGCWAL.removeFiles(getEmptyMap(), serverToFileMap, sortedWALogs, status);
+     MethodCalls calls = ((GCWALPartialMock) realGCWAL).methodCalls;
+     assertEquals("Only one method should have been called", 1, calls.size());
+     assertEquals("Method should be removeSortedWAL", "removeSortedWAL", calls.getFirstEntryMethod());
+     assertEquals("First param should be the Path", p1, calls.getFirstEntryArg(0));
+ 
+   }
+ 
+   static String GCWAL_DEAD_DIR = "gcwal-collect-deadtserver";
+   static String GCWAL_DEAD_TSERVER = "tserver1";
+   static String GCWAL_DEAD_TSERVER_PORT = "9995";
+   static String GCWAL_DEAD_TSERVER_COLLECT_FILE = UUID.randomUUID().toString();
+ 
+   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
+ 
 -    public GCWALDeadTserverCollectMock(Instance i, VolumeManager vm, boolean useTrash) throws IOException {
 -      super(i, vm, useTrash);
++    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
++      super(ctx, vm, useTrash);
+     }
+ 
+     @Override
+     boolean holdsLock(HostAndPort addr) {
+       // tries use zookeeper
+       return false;
+     }
+ 
+     @Override
+     Map<String,Path> getSortedWALogs() {
+       return new HashMap<String,Path>();
+     }
+ 
+     @Override
+     int scanServers(Map<Path,String> fileToServerMap, Map<String,Path> nameToFileMap) throws Exception {
+       String sep = File.separator;
+       Path p = new Path(System.getProperty("user.dir") + sep + "target" + sep + GCWAL_DEAD_DIR + sep + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT + sep
+           + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+       fileToServerMap.put(p, GCWAL_DEAD_TSERVER + ":" + GCWAL_DEAD_TSERVER_PORT);
+       nameToFileMap.put(GCWAL_DEAD_TSERVER_COLLECT_FILE, p);
+       return 1;
+     }
+ 
+     @Override
+     int removeMetadataEntries(Map<String,Path> nameToFileMap, Map<String,Path> sortedWALogs, GCStatus status) throws IOException, KeeperException,
+         InterruptedException {
+       return 0;
+     }
+ 
+     long getGCWALDeadServerWaitTime(AccumuloConfiguration conf) {
+       // tries to use zookeeper
+       return 1000l;
+     }
+   }
+ 
+   @Test
+   public void testCollectWithDeadTserver() throws IOException, InterruptedException {
+     Instance i = new MockInstance();
++    AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(i));
+     File walDir = new File(System.getProperty("user.dir") + File.separator + "target" + File.separator + GCWAL_DEAD_DIR);
+     File walFileDir = new File(walDir + File.separator + GCWAL_DEAD_TSERVER + "+" + GCWAL_DEAD_TSERVER_PORT);
+     File walFile = new File(walFileDir + File.separator + GCWAL_DEAD_TSERVER_COLLECT_FILE);
+     if (!walFileDir.exists()) {
 -      walFileDir.mkdirs();
++      assertTrue("Directory was made", walFileDir.mkdirs());
+       new FileOutputStream(walFile).close();
+     }
+ 
+     try {
+       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
 -      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(i, vm, false);
++      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
+ 
+       gcwal2.collect(status);
+ 
+       assertTrue("File should not be deleted", walFile.exists());
+       assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+       assertEquals("Should not have deleted that file", 0, status.lastLog.getDeleted());
+ 
+       sleep(2000);
+       gcwal2.collect(status);
+ 
+       assertFalse("File should be gone", walFile.exists());
+       assertEquals("Should have one candidate", 1, status.lastLog.getCandidates());
+       assertEquals("Should have deleted that file", 1, status.lastLog.getDeleted());
+ 
+     } finally {
+       if (walDir.exists()) {
+         FileUtils.deleteDirectory(walDir);
+       }
+     }
+   }
  }