You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/05/21 18:47:07 UTC

[1/4] accumulo git commit: ACCUMULO-3423 use zookeeper to track WAL state

Repository: accumulo
Updated Branches:
  refs/heads/master cf9b9a4ea -> 47d1a4db4


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
index 9fcfec9..9af60dc 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferencesTest.java
@@ -18,11 +18,8 @@ package org.apache.accumulo.gc.replication;
 
 import static org.easymock.EasyMock.createMock;
 import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.replay;
-import static org.easymock.EasyMock.verify;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,9 +28,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.UUID;
 
-import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
@@ -49,7 +44,6 @@ import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
@@ -70,8 +64,6 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.net.HostAndPort;
 
 public class CloseWriteAheadLogReferencesTest {
@@ -124,130 +116,6 @@ public class CloseWriteAheadLogReferencesTest {
   }
 
   @Test
-  public void findOneWalFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String file = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    data.add(entry("tserver1:9997[1234567890]", file));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(file), wals);
-
-    verify(conn, bs);
-  }
-
-  // This is a silly test now
-  @Test
-  public void findManyRefsToSingleWalFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-
-    String uuid = UUID.randomUUID().toString();
-
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-    String filename = "hdfs://localhost:8020/accumulo/wal/tserver+9997/" + uuid;
-    data.add(entry("tserver1:9997[0123456789]", filename));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Collections.singleton(filename), wals);
-
-    verify(conn, bs);
-  }
-
-  @Test
-  public void findRefsToManyWalsFromMetadata() throws Exception {
-    Connector conn = createMock(Connector.class);
-    BatchScanner bs = createMock(BatchScanner.class);
-
-    String file1 = "hdfs://localhost:8020/accumulo/wal/tserver1+9997/" + UUID.randomUUID();
-    String file2 = "hdfs://localhost:8020/accumulo/wal/tserver2+9997/" + UUID.randomUUID();
-    String file3 = "hdfs://localhost:8020/accumulo/wal/tserver3+9997/" + UUID.randomUUID();
-
-    // Fake out some data
-    final ArrayList<Entry<Key,Value>> data = new ArrayList<>();
-
-    data.add(entry("tserver1:9997[1234567890]", file1));
-    data.add(entry("tserver2:9997[1234567891]", file2));
-    data.add(entry("tserver3:9997[1234567891]", file3));
-
-    // Get a batchscanner, scan the tablets section, fetch only the logs
-    expect(conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4)).andReturn(bs);
-    bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-    expectLastCall().once();
-    bs.fetchColumnFamily(CurrentLogsSection.COLF);
-    expectLastCall().once();
-    expect(bs.iterator()).andAnswer(new IAnswer<Iterator<Entry<Key,Value>>>() {
-
-      @Override
-      public Iterator<Entry<Key,Value>> answer() throws Throwable {
-        return data.iterator();
-      }
-
-    });
-    // Close the bs
-    bs.close();
-    expectLastCall().once();
-
-    replay(conn, bs);
-
-    // Validate
-    Set<String> wals = refs.getReferencedWals(conn);
-    Assert.assertEquals(Sets.newHashSet(file1, file2, file3), wals);
-
-    verify(conn, bs);
-  }
-
-  private static Entry<Key,Value> entry(String session, String file) {
-    Key key = new Key(new Text(CurrentLogsSection.getRowPrefix() + session), CurrentLogsSection.COLF, new Text(file));
-    return Maps.immutableEntry(key, new Value());
-  }
-
-  @Test
   public void unusedWalsAreClosed() throws Exception {
     Set<String> wals = Collections.emptySet();
     Instance inst = new MockInstance(testName.getMethodName());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 2b874f6..3ff1aa9 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -68,6 +68,7 @@ import org.apache.accumulo.master.state.TableStats;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.log.WalMarker;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.ClosableIterator;
@@ -82,6 +83,7 @@ import org.apache.accumulo.server.master.state.TabletStateStore;
 import org.apache.accumulo.server.tables.TableManager;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.util.MetadataTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
@@ -127,6 +129,8 @@ class TabletGroupWatcher extends Daemon {
     int[] oldCounts = new int[TabletState.values().length];
     EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
 
+    WalMarker wals = new WalMarker(master.getInstance(), ZooReaderWriter.getInstance());
+
     while (this.master.stillMaster()) {
       // slow things down a little, otherwise we spam the logs when there are many wake-up events
       UtilWaitThread.sleep(100);
@@ -242,7 +246,10 @@ class TabletGroupWatcher extends Daemon {
                 assignedToDeadServers.add(tls);
                 if (server.equals(this.master.migrations.get(tls.extent)))
                   this.master.migrations.remove(tls.extent);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                TServerInstance tserver = tls.futureOrCurrent();
+                if (!logsForDeadServers.containsKey(tserver)) {
+                  logsForDeadServers.put(tserver, wals.getWalsInUse(tserver));
+                }
                 break;
               case UNASSIGNED:
                 // maybe it's a finishing migration
@@ -276,7 +283,9 @@ class TabletGroupWatcher extends Daemon {
                 break;
               case ASSIGNED_TO_DEAD_SERVER:
                 assignedToDeadServers.add(tls);
-                MetadataTableUtil.fetchLogsForDeadServer(master, master.getMasterLock(), tls.extent, tls.futureOrCurrent(), logsForDeadServers);
+                if (!logsForDeadServers.containsKey(tls.futureOrCurrent())) {
+                  logsForDeadServers.put(tls.futureOrCurrent(), wals.getWalsInUse(tls.futureOrCurrent()));
+                }
                 break;
               case HOSTED:
                 TServerConnection conn = this.master.tserverSet.getConnection(server);
@@ -296,7 +305,6 @@ class TabletGroupWatcher extends Daemon {
         }
 
         flushChanges(destinations, assignments, assigned, assignedToDeadServers, logsForDeadServers, unassigned);
-        store.markLogsAsUnused(master, logsForDeadServers);
 
         // provide stats after flushing changes to avoid race conditions w/ delete table
         stats.end(masterState);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 7154732..23a4b34 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,6 +161,8 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.SortedLogState;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -319,6 +321,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   private final ServerConfigurationFactory confFactory;
 
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
+  private final WalMarker walMarker;
 
   public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
     super(confFactory);
@@ -364,6 +367,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         TabletLocator.clearLocators();
       }
     }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
+    walMarker = new WalMarker(instance, ZooReaderWriter.getInstance());
 
     // Create the secret manager
     setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));
@@ -2381,6 +2385,12 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       throw new RuntimeException("Failed to start the tablet client service", e1);
     }
     announceExistence();
+    try {
+      walMarker.initWalMarker(getTabletSession());
+    } catch (Exception e) {
+      log.error("Unable to create WAL marker node in zookeeper", e);
+      throw new RuntimeException(e);
+    }
 
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
 
@@ -3020,39 +3030,31 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
       candidates.removeAll(tablet.getCurrentLogFiles());
     }
     try {
-      Set<Path> filenames = new HashSet<>();
+      TServerInstance session = this.getTabletSession();
       for (DfsLogger candidate : candidates) {
-        filenames.add(candidate.getPath());
+        log.info("Marking " + candidate.getPath() + " as unreferenced");
+        walMarker.walUnreferenced(session, candidate.getPath());
       }
-      MetadataTableUtil.markLogUnused(this, this.getLock(), this.getTabletSession(), filenames);
       synchronized (closedLogs) {
         closedLogs.removeAll(candidates);
       }
-    } catch (AccumuloException ex) {
+    } catch (WalMarkerException ex) {
       log.info(ex.toString(), ex);
     }
   }
 
-  public void addLoggersToMetadata(DfsLogger copy, TabletLevel level) {
-    // serialize the updates to the metadata per level: avoids updating the level more than once
-    // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
-    synchronized (levelLocks[level.ordinal()]) {
-      EnumSet<TabletLevel> set = null;
-      set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
-      if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
-        log.info("Writing log marker for level " + level + " " + copy.getFileName());
-        MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), level);
-      }
-      set = metadataTableLogs.get(copy);
-      set.add(level);
-    }
+  public void addNewLogMarker(DfsLogger copy) throws WalMarkerException {
+    log.info("Writing log marker for " + copy.getPath());
+    walMarker.addNewWalMarker(getTabletSession(), copy.getPath());
   }
 
-  public void walogClosed(DfsLogger currentLog) {
+  public void walogClosed(DfsLogger currentLog) throws WalMarkerException {
     metadataTableLogs.remove(currentLog);
     synchronized (closedLogs) {
       closedLogs.add(currentLog);
     }
+    log.info("Marking " + currentLog.getPath() + " as closed");
+    walMarker.closeWal(getTabletSession(), currentLog.getPath());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 809c56a..4836d99 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -41,7 +41,7 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationConfigurationUtil;
 import org.apache.accumulo.core.util.SimpleThreadPool;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.server.TabletLevel;
+import org.apache.accumulo.fate.util.LoggingRunnable;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.replication.StatusUtil;
@@ -235,7 +235,7 @@ public class TabletServerLogger {
       return;
     }
     nextLogMaker = new SimpleThreadPool(1, "WALog creator");
-    nextLogMaker.submit(new Runnable() {
+    nextLogMaker.submit(new LoggingRunnable(log, new Runnable() {
       @Override
       public void run() {
         final ServerResources conf = tserver.getServerConfig();
@@ -248,6 +248,7 @@ public class TabletServerLogger {
             alog.open(tserver.getClientAddressString());
             String fileName = alog.getFileName();
             log.debug("Created next WAL " + fileName);
+            tserver.addNewLogMarker(alog);
             while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
               log.info("Our WAL was not used for 12 hours: " + fileName);
             }
@@ -280,7 +281,7 @@ public class TabletServerLogger {
           }
         }
       }
-    });
+    }));
   }
 
   public void resetLoggers() throws IOException {
@@ -348,8 +349,6 @@ public class TabletServerLogger {
               try {
                 // Scribble out a tablet definition and then write to the metadata table
                 defineTablet(commitSession);
-                if (currentLogId == logId.get())
-                  tserver.addLoggersToMetadata(copy, TabletLevel.getLevel(commitSession.getExtent()));
               } finally {
                 commitSession.finishUpdatingLogsUsed();
               }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
index 03d783c..b1c010c 100644
--- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -18,7 +18,9 @@ package org.apache.accumulo.test;
 
 import static org.junit.Assert.assertEquals;
 
+import java.util.List;
 import java.util.Map.Entry;
+import java.util.UUID;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
@@ -30,10 +32,12 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.RawLocalFileSystem;
@@ -118,13 +122,12 @@ public class UnusedWALIT extends ConfigurableMacIT {
   }
 
   private int getWALCount(Connector c) throws Exception {
-    Scanner s = c.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    try {
-      return Iterators.size(s.iterator());
-    } finally {
-      s.close();
+    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    int result = 0;
+    for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
+      result += entry.getValue().size();
     }
+    return result;
   }
 
   private void writeSomeData(Connector conn, String table, int startRow, int rowCount, int startCol, int colCount) throws Exception {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 2b24219..8f4fe75 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -65,7 +65,10 @@ import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.init.Initialize;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.util.Admin;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -375,8 +378,7 @@ public class VolumeIT extends ConfigurableMacIT {
     bw.close();
   }
 
-  private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws AccumuloException, AccumuloSecurityException,
-      TableExistsException, TableNotFoundException, MutationsRejectedException {
+  private void verifyVolumesUsed(String tableName, boolean shouldExist, Path... paths) throws Exception {
 
     Connector conn = getConnector();
 
@@ -426,19 +428,14 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
-    Text path = new Text();
-    for (String table : new String[] {RootTable.NAME, MetadataTable.NAME}) {
-      Scanner meta = conn.createScanner(table, Authorizations.EMPTY);
-      meta.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      outer: for (Entry<Key,Value> entry : meta) {
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        for (int i = 0; i < paths.length; i++) {
-          if (path.toString().startsWith(paths[i].toString())) {
-            continue outer;
-          }
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      for (Path path : paths) {
+        if (entry.getKey().toString().startsWith(path.toString())) {
+          continue outer;
         }
-        Assert.fail("Unexpected volume " + path);
       }
+      Assert.fail("Unexpected volume " + entry.getKey());
     }
 
     // if a volume is chosen randomly for each tablet, then the probability that a volume will not be chosen for any tablet is ((num_volumes -

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index f2ceb2c..22e2930 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -45,17 +45,18 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.TabletColumnFamily;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.UtilWaitThread;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.master.state.SetGoalState;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.WatchedEvent;
@@ -199,23 +200,10 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
 
   private Map<String,Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception {
     Map<String,Boolean> result = new HashMap<>();
-    Scanner root = c.createScanner(RootTable.NAME, EMPTY);
-    root.setRange(CurrentLogsSection.getRange());
-    Scanner meta = c.createScanner(MetadataTable.NAME, EMPTY);
-    meta.setRange(root.getRange());
-    Iterator<Entry<Key,Value>> both = Iterators.concat(root.iterator(), meta.iterator());
-    while (both.hasNext()) {
-      Entry<Key,Value> entry = both.next();
-      Text path = new Text();
-      CurrentLogsSection.getPath(entry.getKey(), path);
-      result.put(path.toString(), entry.getValue().get().length == 0);
-    }
-    String zpath = ZooUtil.getRoot(c.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-    List<String> children = zoo.getChildren(zpath, null);
-    for (String child : children) {
-      byte[] data = zoo.getData(zpath + "/" + child, null, null);
-      LogEntry entry = LogEntry.fromBytes(data);
-      result.put(entry.filename, true);
+    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      // WALs are in use if they are not unreferenced
+      result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);
     }
     return result;
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 62ed9c2..47873f6 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -39,7 +39,6 @@ import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -47,7 +46,10 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -103,20 +105,14 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
-    Scanner s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    s.setRange(CurrentLogsSection.getRange());
-    s.fetchColumnFamily(CurrentLogsSection.COLF);
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
 
-    Set<String> wals = new HashSet<String>();
-    for (Entry<Key,Value> entry : s) {
-      log.debug("Reading WALs: {}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
-      // hostname:port/uri://path/to/wal
-      String path = new Path(entry.getKey().getColumnQualifier().toString()).toString();
-      log.debug("Extracted file: " + path);
-      wals.add(path);
+    Set<String> result = new HashSet<String>();
+    for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+      log.debug("Reading WALs: {}={}", entry.getKey(), entry.getValue());
+      result.add(entry.getKey().toString());
     }
-
-    return wals;
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 54b42f4..ba68cc2 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.accumulo.core.Constants;
@@ -65,7 +66,7 @@ import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.security.TablePermission;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.core.util.AddressUtil;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
@@ -74,6 +75,8 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
@@ -81,6 +84,7 @@ import org.apache.accumulo.server.replication.StatusFormatter;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.util.ReplicationTableUtil;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -131,7 +135,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  private Multimap<String,String> getLogs(Connector conn) throws TableNotFoundException {
+  private Multimap<String,String> getLogs(Connector conn) throws Exception {
     // Map of server to tableId
     Multimap<TServerInstance,String> serverToTableID = HashMultimap.create();
     Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -144,20 +148,13 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
     // Map of logs to tableId
     Multimap<String,String> logs = HashMultimap.create();
-    scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
-    for (Entry<Key,Value> entry : scanner) {
-      if (Thread.interrupted()) {
-        return logs;
-      }
-      Text path = new Text();
-      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-      Text session = new Text();
-      Text hostPort = new Text();
-      MetadataSchema.CurrentLogsSection.getTabletServer(entry.getKey(), hostPort, session);
-      TServerInstance server = new TServerInstance(AddressUtil.parseAddress(hostPort.toString(), false), session.toString());
-      for (String tableId : serverToTableID.get(server)) {
-        logs.put(new Path(path.toString()).toString(), tableId);
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
+      for (UUID id : entry.getValue()) {
+        Pair<WalState,Path> state = wals.state(entry.getKey(), id);
+        for (String tableId : serverToTableID.get(entry.getKey())) {
+          logs.put(state.getSecond().toString(), tableId);
+        }
       }
     }
     return logs;
@@ -308,16 +305,11 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
 
     Set<String> wals = Sets.newHashSet();
-    Scanner s;
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
-      s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-      s.setRange(MetadataSchema.CurrentLogsSection.getRange());
-      s.fetchColumnFamily(MetadataSchema.CurrentLogsSection.COLF);
-      for (Entry<Key,Value> entry : s) {
-        Text path = new Text();
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        wals.add(new Path(path.toString()).toString());
+      WalMarker markers = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+      for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) {
+        wals.add(entry.getKey().toString());
       }
       attempts--;
     }
@@ -511,8 +503,8 @@ public class ReplicationIT extends ConfigurableMacIT {
         while (keepRunning.get()) {
           try {
             logs.putAll(getLogs(conn));
-          } catch (TableNotFoundException e) {
-            log.error("Metadata table doesn't exist");
+          } catch (Exception e) {
+            log.error("Error getting logs", e);
           }
         }
       }


[3/4] accumulo git commit: ACCUMULO-3423 merge origin

Posted by ec...@apache.org.
ACCUMULO-3423 merge origin


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/08633f02
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/08633f02
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/08633f02

Branch: refs/heads/master
Commit: 08633f02bc25ed04db1c1799bc7eeaff425fb91f
Parents: 0b48793 cf9b9a4
Author: Eric Newton <er...@gmail.com>
Authored: Thu May 21 11:46:59 2015 -0400
Committer: Eric Newton <er...@gmail.com>
Committed: Thu May 21 11:46:59 2015 -0400

----------------------------------------------------------------------
 CHANGES                                         |   1 +
 assemble/bin/config.sh                          |   6 +-
 assemble/bin/start-server.sh                    |   4 +-
 .../core/client/impl/ConditionalWriterImpl.java |   2 +-
 .../org/apache/accumulo/core/conf/Property.java |   4 +-
 .../accumulo/core/file/rfile/bcfile/Utils.java  |   2 +-
 .../iterators/user/IntersectingIterator.java    |  19 ++--
 .../mapred/AccumuloFileOutputFormatTest.java    |   2 +
 .../mapreduce/AccumuloFileOutputFormatTest.java |   2 +
 .../main/asciidoc/chapters/administration.txt   |   2 +-
 .../accumulo/examples/simple/shard/Index.java   |   7 +-
 minicluster/pom.xml                             |  10 ++
 .../standalone/StandaloneAccumuloCluster.java   |  12 ++-
 .../standalone/StandaloneClusterControl.java    |  25 +++++
 .../minicluster/MiniAccumuloRunner.java         |   4 +
 .../impl/MiniAccumuloClusterImpl.java           |  76 +++++++------
 .../impl/MiniAccumuloConfigImpl.java            |  45 ++++++--
 .../StandaloneAccumuloClusterTest.java          |  53 +++++++++
 ...niAccumuloClusterExistingZooKeepersTest.java | 107 +++++++++++++++++++
 pom.xml                                         |  20 +++-
 .../apache/accumulo/server/init/Initialize.java |   2 +-
 .../accumulo/server/util/SendLogToChainsaw.java |   2 +-
 .../org/apache/accumulo/monitor/util/Table.java |   2 +-
 .../monitor/util/celltypes/NumberType.java      |   2 +-
 .../accumulo/tserver/log/LocalWALRecovery.java  |  75 ++++++-------
 .../accumulo/tserver/tablet/RootFilesTest.java  |   7 +-
 .../shell/commands/FormatterCommandTest.java    |   2 +-
 .../start/classloader/AccumuloClassLoader.java  |   7 +-
 .../classloader/vfs/UniqueFileReplicator.java   |   3 +-
 .../accumulo/test/continuous/TimeBinner.java    |   3 +
 .../test/continuous/UndefinedAnalyzer.java      |  84 ++++++++-------
 .../test/functional/CacheTestWriter.java        |   3 +
 .../apache/accumulo/test/randomwalk/Node.java   |  16 +--
 .../test/randomwalk/multitable/CopyTool.java    |   6 +-
 .../apache/accumulo/test/AuditMessageIT.java    |   5 +-
 .../accumulo/test/functional/ConstraintIT.java  |  24 ++++-
 .../accumulo/test/proxy/SimpleProxyBase.java    |  16 ++-
 .../test/replication/ReplicationIT.java         |  92 ++++++++--------
 38 files changed, 541 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/08633f02/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/08633f02/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --cc test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index ba68cc2,ef81f2c..41cb75a
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@@ -502,9 -537,9 +528,9 @@@ public class ReplicationIT extends Conf
          // when that happens
          while (keepRunning.get()) {
            try {
-             logs.putAll(getLogs(conn));
+             logs.putAll(getAllLogs(conn));
 -          } catch (TableNotFoundException e) {
 -            log.error("Metadata table doesn't exist");
 +          } catch (Exception e) {
 +            log.error("Error getting logs", e);
            }
          }
        }


[4/4] accumulo git commit: ACCUMULO-3423 fix merge, rename WalMarker to WalStateManager

Posted by ec...@apache.org.
ACCUMULO-3423 fix merge, rename WalMarker to WalStateManager


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/47d1a4db
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/47d1a4db
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/47d1a4db

Branch: refs/heads/master
Commit: 47d1a4db42788e41274f10467c7ccd11a10a3015
Parents: 08633f0
Author: Eric Newton <er...@gmail.com>
Authored: Thu May 21 12:43:21 2015 -0400
Committer: Eric Newton <er...@gmail.com>
Committed: Thu May 21 12:43:21 2015 -0400

----------------------------------------------------------------------
 .../apache/accumulo/server/init/Initialize.java |   4 +-
 .../apache/accumulo/server/log/WalMarker.java   | 217 -------------------
 .../accumulo/server/log/WalStateManager.java    | 217 +++++++++++++++++++
 .../accumulo/server/util/ListVolumesUsed.java   |   4 +-
 .../gc/GarbageCollectWriteAheadLogs.java        |  12 +-
 .../CloseWriteAheadLogReferences.java           |   8 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    |  14 +-
 .../accumulo/master/TabletGroupWatcher.java     |   4 +-
 .../apache/accumulo/tserver/TabletServer.java   |   8 +-
 .../org/apache/accumulo/test/UnusedWALIT.java   |   4 +-
 .../java/org/apache/accumulo/test/VolumeIT.java |   6 +-
 .../accumulo/test/functional/WALSunnyDayIT.java |   6 +-
 ...bageCollectorCommunicatesWithTServersIT.java |   6 +-
 .../test/replication/ReplicationIT.java         |  10 +-
 14 files changed, 260 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 8564b87..80c5879 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -88,7 +88,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -550,7 +550,7 @@ public class Initialize implements KeywordExecutable {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
-    zoo.putPersistentData(zkInstanceRoot + WalMarker.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + WalStateManager.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
   }
 
   private String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
deleted file mode 100644
index 9cfd99f..0000000
--- a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.server.log;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.UUID;
-
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.util.Pair;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
-import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.Path;
-import org.apache.zookeeper.KeeperException;
-
-/*
- * WAL Markers.  This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs
- * in use by tablet servers and the replication machinery.
- *
- * The Master needs to know the state of the WALs to mark tablets during recovery.
- * The GC needs to know when a log is no longer needed so it can be removed.
- * The replication mechanism needs to know when a log is closed and can be forwarded to the destination table.
- *
- * The state of the WALs is kept in Zookeeper under <root>/wals.
- * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
- * Under the server znode, is a node for each log, using the UUID for the log.
- * In each of the WAL znodes, is the current state of the log, and the full path to the log.
- *
- * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state of the file.
- *
- * In the event of a recovery, the log is identified as belonging to a dead server.  The master will update
- * the tablets assigned to that server with log references. Once all tablets have been reassigned and the log
- * references are removed, the log will be eligible for deletion.
- *
- * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still need the log.
- * The GC will defer log removal until replication is finished with it.
- *
- */
-public class WalMarker {
-
-  public class WalMarkerException extends Exception {
-    static private final long serialVersionUID = 1L;
-
-    public WalMarkerException(Exception ex) {
-      super(ex);
-    }
-  }
-
-  public final static String ZWALS = "/wals";
-
-  public static enum WalState {
-    /* log is open, and may be written to */
-    OPEN,
-    /* log is closed, and will not be written to again */
-    CLOSED,
-    /* unreferenced: no tablet needs the log for recovery */
-    UNREFERENCED
-  }
-
-  private final Instance instance;
-  private final ZooReaderWriter zoo;
-
-  public WalMarker(Instance instance, ZooReaderWriter zoo) {
-    this.instance = instance;
-    this.zoo = zoo;
-  }
-
-  private String root() {
-    return ZooUtil.getRoot(instance) + ZWALS;
-  }
-
-  // Tablet server exists
-  public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
-    byte[] data = new byte[0];
-    try {
-      zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // Tablet server opens a new WAL
-  public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
-    updateState(tsi, path, WalState.OPEN);
-  }
-
-  private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException {
-    byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
-    try {
-      NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
-      if (state == WalState.OPEN) {
-        policy = NodeExistsPolicy.FAIL;
-      }
-      zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // Tablet server has no references to the WAL
-  public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
-    updateState(tsi, path, WalState.UNREFERENCED);
-  }
-
-  private static Pair<WalState,Path> parse(byte data[]) {
-    String parts[] = new String(data, UTF_8).split(",");
-    return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
-  }
-
-  // Master needs to know the logs for the given instance
-  public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
-    List<Path> result = new ArrayList<>();
-    try {
-      String zpath = root() + "/" + tsi.toString();
-      zoo.sync(zpath);
-      for (String child : zoo.getChildren(zpath)) {
-        Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
-        if (parts.getFirst() != WalState.UNREFERENCED) {
-          result.add(parts.getSecond());
-        }
-      }
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-    return result;
-  }
-
-  // garbage collector wants the list of logs markers for all servers
-  public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException {
-    Map<TServerInstance,List<UUID>> result = new HashMap<>();
-    try {
-      String path = root();
-      for (String child : zoo.getChildren(path)) {
-        List<UUID> logs = result.get(child);
-        if (logs == null) {
-          result.put(new TServerInstance(child), logs = new ArrayList<>());
-        }
-        for (String idString : zoo.getChildren(path + "/" + child)) {
-          logs.add(UUID.fromString(idString));
-        }
-      }
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-    return result;
-  }
-
-  // garbage collector wants to know the state (open/closed) of a log, and the filename to delete
-  public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException {
-    try {
-      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
-      return parse(zoo.getData(path, null));
-    } catch (KeeperException | InterruptedException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // utility combination of getAllMarkers and state
-  public Map<Path,WalState> getAllState() throws WalMarkerException {
-    Map<Path,WalState> result = new HashMap<>();
-    for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet()) {
-      for (UUID id : entry.getValue()) {
-        Pair<WalState,Path> state = state(entry.getKey(), id);
-        result.put(state.getSecond(), state.getFirst());
-      }
-    }
-    return result;
-  }
-
-  // garbage collector knows it's safe to remove the marker for a closed log
-  public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException {
-    try {
-      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
-      zoo.delete(path, -1);
-    } catch (InterruptedException | KeeperException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // garbage collector knows the instance is dead, and has no markers
-  public void forget(TServerInstance instance) throws WalMarkerException {
-    String path = root() + "/" + instance.toString();
-    try {
-      zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
-    } catch (InterruptedException | KeeperException e) {
-      throw new WalMarkerException(e);
-    }
-  }
-
-  // tablet server can mark the log as closed (but still needed), for replication to begin
-  public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
-    updateState(instance, path, WalState.CLOSED);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
new file mode 100644
index 0000000..32f3cbe
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalStateManager.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs
+ * in use by tablet servers and the replication machinery.
+ *
+ * The Master needs to know the state of the WALs to mark tablets during recovery.
+ * The GC needs to know when a log is no longer needed so it can be removed.
+ * The replication mechanism needs to know when a log is closed and can be forwarded to the destination table.
+ *
+ * The state of the WALs is kept in Zookeeper under /accumulo/<instanceid>/wals.
+ * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
+ * Under the server znode, is a node for each log, using the UUID for the log.
+ * In each of the WAL znodes, is the current state of the log, and the full path to the log.
+ *
+ * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state of the file.
+ *
+ * In the event of a recovery, the log is identified as belonging to a dead server.  The master will update
+ * the tablets assigned to that server with log references. Once all tablets have been reassigned and the log
+ * references are removed, the log will be eligible for deletion.
+ *
+ * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still need the log.
+ * The GC will defer log removal until replication is finished with it.
+ *
+ */
+public class WalStateManager {
+
+  public class WalMarkerException extends Exception {
+    static private final long serialVersionUID = 1L;
+
+    public WalMarkerException(Exception ex) {
+      super(ex);
+    }
+  }
+
+  public final static String ZWALS = "/wals";
+
+  public static enum WalState {
+    /* log is open, and may be written to */
+    OPEN,
+    /* log is closed, and will not be written to again */
+    CLOSED,
+    /* unreferenced: no tablet needs the log for recovery */
+    UNREFERENCED
+  }
+
+  private final Instance instance;
+  private final ZooReaderWriter zoo;
+
+  public WalStateManager(Instance instance, ZooReaderWriter zoo) {
+    this.instance = instance;
+    this.zoo = zoo;
+  }
+
+  private String root() {
+    return ZooUtil.getRoot(instance) + ZWALS;
+  }
+
+  // Tablet server exists
+  public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
+    byte[] data = new byte[0];
+    try {
+      zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server opens a new WAL
+  public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.OPEN);
+  }
+
+  private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException {
+    byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
+    try {
+      NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
+      if (state == WalState.OPEN) {
+        policy = NodeExistsPolicy.FAIL;
+      }
+      zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server has no references to the WAL
+  public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.UNREFERENCED);
+  }
+
+  private static Pair<WalState,Path> parse(byte data[]) {
+    String parts[] = new String(data, UTF_8).split(",");
+    return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
+  }
+
+  // Master needs to know the logs for the given instance
+  public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
+    List<Path> result = new ArrayList<>();
+    try {
+      String zpath = root() + "/" + tsi.toString();
+      zoo.sync(zpath);
+      for (String child : zoo.getChildren(zpath)) {
+        Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
+        if (parts.getFirst() != WalState.UNREFERENCED) {
+          result.add(parts.getSecond());
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants the list of logs markers for all servers
+  public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException {
+    Map<TServerInstance,List<UUID>> result = new HashMap<>();
+    try {
+      String path = root();
+      for (String child : zoo.getChildren(path)) {
+        List<UUID> logs = result.get(child);
+        if (logs == null) {
+          result.put(new TServerInstance(child), logs = new ArrayList<>());
+        }
+        for (String idString : zoo.getChildren(path + "/" + child)) {
+          logs.add(UUID.fromString(idString));
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants to know the state (open/closed) of a log, and the filename to delete
+  public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException {
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      return parse(zoo.getData(path, null));
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // utility combination of getAllMarkers and state
+  public Map<Path,WalState> getAllState() throws WalMarkerException {
+    Map<Path,WalState> result = new HashMap<>();
+    for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet()) {
+      for (UUID id : entry.getValue()) {
+        Pair<WalState,Path> state = state(entry.getKey(), id);
+        result.put(state.getSecond(), state.getFirst());
+      }
+    }
+    return result;
+  }
+
+  // garbage collector knows it's safe to remove the marker for a closed log
+  public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException {
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      zoo.delete(path, -1);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // garbage collector knows the instance is dead, and has no markers
+  public void forget(TServerInstance instance) throws WalMarkerException {
+    String path = root() + "/" + instance.toString();
+    try {
+      zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // tablet server can mark the log as closed (but still needed), for replication to begin
+  public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
+    updateState(instance, path, WalState.CLOSED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 7cf3f37..9ad461b 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -34,7 +34,7 @@ import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 
@@ -125,7 +125,7 @@ public class ListVolumesUsed {
 
     volumes.clear();
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     for (Path path : wals.getAllState().keySet()) {
       volumes.add(getLogURI(path.toString()));
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 414d29e..b8fb9fb 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -45,9 +45,9 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
 import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -72,7 +72,7 @@ public class GarbageCollectWriteAheadLogs {
   private final VolumeManager fs;
   private final boolean useTrash;
   private final LiveTServerSet liveServers;
-  private final WalMarker walMarker;
+  private final WalStateManager walMarker;
   private final Iterable<TabletLocationState> store;
 
   /**
@@ -97,7 +97,7 @@ public class GarbageCollectWriteAheadLogs {
       }
     });
     liveServers.startListeningForTabletServerChanges();
-    this.walMarker = new WalMarker(context.getInstance(), ZooReaderWriter.getInstance());
+    this.walMarker = new WalStateManager(context.getInstance(), ZooReaderWriter.getInstance());
     this.store = new Iterable<TabletLocationState>() {
       @Override
       public Iterator<TabletLocationState> iterator() {
@@ -119,7 +119,7 @@ public class GarbageCollectWriteAheadLogs {
    *          a started LiveTServerSet instance
    */
   @VisibleForTesting
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet, WalMarker walMarker,
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet, WalStateManager walMarker,
       Iterable<TabletLocationState> store) throws IOException {
     this.context = context;
     this.fs = fs;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index 03d2e67..a3652e2 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -47,9 +47,9 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -168,7 +168,7 @@ public class CloseWriteAheadLogReferences implements Runnable {
    * @return The Set of WALs that are referenced in the metadata table
    */
   protected HashSet<String> getReferencedWals(Connector conn) {
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
 
     HashSet<String> referencedWals = new HashSet<>();
     try {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index 9cb32c8..1ab8eb6 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -38,8 +38,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
@@ -78,7 +78,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void testRemoveUnusedLog() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
@@ -105,7 +105,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void testKeepClosedLog() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 
     GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
@@ -128,7 +128,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void deleteUnreferenceLogOnDeadServer() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);
@@ -159,7 +159,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void ignoreReferenceLogOnDeadServer() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);
@@ -185,7 +185,7 @@ public class GarbageCollectWriteAheadLogsTest {
   public void replicationDelaysFileCollection() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    WalStateManager marker = EasyMock.createMock(WalStateManager.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
     Connector conn = EasyMock.createMock(Connector.class);
     Scanner scanner = EasyMock.createMock(Scanner.class);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
index 3ff1aa9..d55781e 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/TabletGroupWatcher.java
@@ -68,7 +68,7 @@ import org.apache.accumulo.master.state.TableStats;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.master.LiveTServerSet.TServerConnection;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.ClosableIterator;
@@ -129,7 +129,7 @@ class TabletGroupWatcher extends Daemon {
     int[] oldCounts = new int[TabletState.values().length];
     EventCoordinator.Listener eventListener = this.master.nextEvent.getListener();
 
-    WalMarker wals = new WalMarker(master.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(master.getInstance(), ZooReaderWriter.getInstance());
 
     while (this.master.stillMaster()) {
       // slow things down a little, otherwise we spam the logs when there are many wake-up events

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 23a4b34..c0a29eb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -161,8 +161,8 @@ import org.apache.accumulo.server.fs.VolumeManager.FileType;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.fs.VolumeUtil;
 import org.apache.accumulo.server.log.SortedLogState;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalMarkerException;
 import org.apache.accumulo.server.master.recovery.RecoveryPath;
 import org.apache.accumulo.server.master.state.Assignment;
 import org.apache.accumulo.server.master.state.DistributedStoreException;
@@ -321,7 +321,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   private final ServerConfigurationFactory confFactory;
 
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
-  private final WalMarker walMarker;
+  private final WalStateManager walMarker;
 
   public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) {
     super(confFactory);
@@ -367,7 +367,7 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
         TabletLocator.clearLocators();
       }
     }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS));
-    walMarker = new WalMarker(instance, ZooReaderWriter.getInstance());
+    walMarker = new WalStateManager(instance, ZooReaderWriter.getInstance());
 
     // Create the secret manager
     setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME)));

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
index b1c010c..8d22ad3 100644
--- a/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/UnusedWALIT.java
@@ -35,7 +35,7 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalStateManager;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -122,7 +122,7 @@ public class UnusedWALIT extends ConfigurableMacIT {
   }
 
   private int getWALCount(Connector c) throws Exception {
-    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(c.getInstance(), ZooReaderWriter.getInstance());
     int result = 0;
     for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
       result += entry.getValue().size();

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
index 8f4fe75..b66d13f 100644
--- a/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/VolumeIT.java
@@ -65,8 +65,8 @@ import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.init.Initialize;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.util.Admin;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -428,7 +428,7 @@ public class VolumeIT extends ConfigurableMacIT {
       Assert.fail("Unexpected volume " + path);
     }
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     outer: for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
       for (Path path : paths) {
         if (entry.getKey().toString().startsWith(path.toString())) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 22e2930..8d7dd62 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -52,8 +52,8 @@ import org.apache.accumulo.master.state.SetGoalState;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterControl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -200,7 +200,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
 
   private Map<String,Boolean> getWals(Connector c, ZooKeeper zoo) throws Exception {
     Map<String,Boolean> result = new HashMap<>();
-    WalMarker wals = new WalMarker(c.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(c.getInstance(), ZooReaderWriter.getInstance());
     for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
       // WALs are in use if they are not unreferenced
       result.put(entry.getKey().toString(), entry.getValue() != WalState.UNREFERENCED);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
index 47873f6..ddaef00 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/GarbageCollectorCommunicatesWithTServersIT.java
@@ -46,8 +46,8 @@ import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.thrift.TabletClientService.Client;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.test.functional.ConfigurableMacIT;
@@ -105,7 +105,7 @@ public class GarbageCollectorCommunicatesWithTServersIT extends ConfigurableMacI
 
     Assert.assertNotNull("Could not determine table ID for " + tableName, tableId);
 
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
 
     Set<String> result = new HashSet<String>();
     for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/47d1a4db/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 41cb75a..da9dd24 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -76,8 +76,8 @@ import org.apache.accumulo.fate.zookeeper.ZooLock;
 import org.apache.accumulo.gc.SimpleGarbageCollector;
 import org.apache.accumulo.minicluster.ServerType;
 import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
-import org.apache.accumulo.server.log.WalMarker;
-import org.apache.accumulo.server.log.WalMarker.WalState;
+import org.apache.accumulo.server.log.WalStateManager;
+import org.apache.accumulo.server.log.WalStateManager.WalState;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.replication.ReplicaSystemFactory;
 import org.apache.accumulo.server.replication.StatusCombiner;
@@ -149,7 +149,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     }
     // Map of logs to tableId
     Multimap<String,String> logs = HashMultimap.create();
-    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    WalStateManager wals = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
     for (Entry<TServerInstance,List<UUID>> entry : wals.getAllMarkers().entrySet()) {
       for (UUID id : entry.getValue()) {
         Pair<WalState,Path> state = wals.state(entry.getKey(), id);
@@ -161,7 +161,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     return logs;
   }
 
-  private Multimap<String,String> getAllLogs(Connector conn) throws TableNotFoundException {
+  private Multimap<String,String> getAllLogs(Connector conn) throws Exception {
     Multimap<String,String> logs = getLogs(conn);
     try {
       Scanner scanner = conn.createScanner(ReplicationTable.NAME, Authorizations.EMPTY);
@@ -333,7 +333,7 @@ public class ReplicationIT extends ConfigurableMacIT {
     Set<String> wals = Sets.newHashSet();
     attempts = 5;
     while (wals.isEmpty() && attempts > 0) {
-      WalMarker markers = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+      WalStateManager markers = new WalStateManager(conn.getInstance(), ZooReaderWriter.getInstance());
       for (Entry<Path,WalState> entry : markers.getAllState().entrySet()) {
         wals.add(entry.getKey().toString());
       }


[2/4] accumulo git commit: ACCUMULO-3423 use zookeeper to track WAL state

Posted by ec...@apache.org.
ACCUMULO-3423 use zookeeper to track WAL state


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0b487930
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0b487930
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0b487930

Branch: refs/heads/master
Commit: 0b487930b4096ab9ed76e19735b1cc1cc6512b9e
Parents: 61f9279
Author: Eric Newton <er...@gmail.com>
Authored: Thu May 21 11:44:00 2015 -0400
Committer: Eric Newton <er...@gmail.com>
Committed: Thu May 21 11:44:00 2015 -0400

----------------------------------------------------------------------
 .../core/metadata/schema/MetadataSchema.java    |  47 --
 .../core/metadata/MetadataTableSchemaTest.java  |  47 --
 .../apache/accumulo/server/init/Initialize.java |   2 +
 .../apache/accumulo/server/log/WalMarker.java   | 217 +++++++++
 .../server/master/state/MetaDataStateStore.java |  29 --
 .../server/master/state/TServerInstance.java    |  14 +-
 .../server/master/state/TabletStateStore.java   |   6 -
 .../master/state/ZooTabletStateStore.java       |  10 -
 .../accumulo/server/util/ListVolumesUsed.java   |  13 +-
 .../accumulo/server/util/MetadataTableUtil.java | 138 ------
 .../gc/GarbageCollectWriteAheadLogs.java        | 439 ++++++-------------
 .../CloseWriteAheadLogReferences.java           |  58 +--
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 259 ++++++-----
 .../CloseWriteAheadLogReferencesTest.java       | 132 ------
 .../accumulo/master/TabletGroupWatcher.java     |  14 +-
 .../apache/accumulo/tserver/TabletServer.java   |  38 +-
 .../tserver/log/TabletServerLogger.java         |   9 +-
 .../org/apache/accumulo/test/UnusedWALIT.java   |  17 +-
 .../java/org/apache/accumulo/test/VolumeIT.java |  23 +-
 .../accumulo/test/functional/WALSunnyDayIT.java |  28 +-
 ...bageCollectorCommunicatesWithTServersIT.java |  22 +-
 .../test/replication/ReplicationIT.java         |  44 +-
 22 files changed, 630 insertions(+), 976 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index fe75f9e..8a98ba0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -16,14 +16,11 @@
  */
 package org.apache.accumulo.core.metadata.schema;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.ArrayByteSequence;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.schema.Section;
 import org.apache.accumulo.core.util.ColumnFQ;
 import org.apache.hadoop.io.Text;
@@ -282,48 +279,4 @@ public class MetadataSchema {
     }
   }
 
-  /**
-   * Holds references to the WALs in use in a live Tablet Server.
-   * <p>
-   * <code>~wal+tserver:port[sessionId] log:hdfs://localhost:8020/accumulo/wal/tserver+port/WAL  [] -></code>
-   */
-  public static class CurrentLogsSection {
-    private static final Section section = new Section(RESERVED_PREFIX + "wal+", true, RESERVED_PREFIX + "wal,", false);
-    private static byte LEFT_BRACKET = (byte) '[';
-    public static final Text COLF = new Text("log");
-    public static final Value UNUSED = new Value("unused".getBytes(UTF_8));
-
-    public static Range getRange() {
-      return section.getRange();
-    }
-
-    public static String getRowPrefix() {
-      return section.getRowPrefix();
-    }
-
-    public static void getTabletServer(Key k, Text hostPort, Text session) {
-      Preconditions.checkNotNull(k);
-      Preconditions.checkNotNull(hostPort);
-      Preconditions.checkNotNull(session);
-
-      Text row = new Text();
-      k.getRow(row);
-      if (!row.toString().startsWith(section.getRowPrefix())) {
-        throw new IllegalArgumentException("Bad key " + k.toString());
-      }
-      for (int sessionStart = section.getRowPrefix().length(); sessionStart < row.getLength() - 1; sessionStart++) {
-        if (row.charAt(sessionStart) == LEFT_BRACKET) {
-          hostPort.set(row.getBytes(), section.getRowPrefix().length(), sessionStart - section.getRowPrefix().length());
-          session.set(row.getBytes(), sessionStart + 1, row.getLength() - sessionStart - 2);
-          return;
-        }
-      }
-      throw new IllegalArgumentException("Bad key " + k.toString());
-    }
-
-    public static void getPath(Key k, Text path) {
-      k.getColumnQualifier(path);
-    }
-
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
deleted file mode 100644
index cfe59f2..0000000
--- a/core/src/test/java/org/apache/accumulo/core/metadata/MetadataTableSchemaTest.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.accumulo.core.metadata;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.hadoop.io.Text;
-import org.junit.Test;
-
-public class MetadataTableSchemaTest {
-
-  @Test
-  public void testGetTabletServer() throws Exception {
-    Key key = new Key("~wal+host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
-    Text hostPort = new Text();
-    Text session = new Text();
-    CurrentLogsSection.getTabletServer(key, hostPort, session);
-    assertEquals("host:43861", hostPort.toString());
-    assertEquals("14a7df0e6420003", session.toString());
-    try {
-      Key bogus = new Key("~wal/host:43861[14a7df0e6420003]", "log", "hdfs://localhost:50514/accumulo/wal/host:43861/70c27ab3-6662-40ab-80fb-01c1f1a59df3");
-      CurrentLogsSection.getTabletServer(bogus, hostPort, session);
-      fail("bad argument not thrown");
-    } catch (IllegalArgumentException ex) {
-
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
index 9afb93f..4753d38 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/init/Initialize.java
@@ -88,6 +88,7 @@ import org.apache.accumulo.server.constraints.MetadataConstraints;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.iterators.MetadataBulkLoadFilter;
+import org.apache.accumulo.server.log.WalMarker;
 import org.apache.accumulo.server.replication.ReplicationUtil;
 import org.apache.accumulo.server.replication.StatusCombiner;
 import org.apache.accumulo.server.security.AuditedSecurityOperation;
@@ -549,6 +550,7 @@ public class Initialize implements KeywordExecutable {
     zoo.putPersistentData(zkInstanceRoot + Constants.ZMONITOR_LOCK, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_BASE, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
     zoo.putPersistentData(zkInstanceRoot + ReplicationConstants.ZOO_TSERVERS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
+    zoo.putPersistentData(zkInstanceRoot + WalMarker.ZWALS, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
   }
 
   private String getInstanceNamePath(Opts opts) throws IOException, KeeperException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
new file mode 100644
index 0000000..9cfd99f
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/log/WalMarker.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.log;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.util.Pair;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
+import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.Path;
+import org.apache.zookeeper.KeeperException;
+
+/*
+ * WAL Markers.  This class governs the space in Zookeeper that advertises the status of Write-Ahead Logs
+ * in use by tablet servers and the replication machinery.
+ *
+ * The Master needs to know the state of the WALs to mark tablets during recovery.
+ * The GC needs to know when a log is no longer needed so it can be removed.
+ * The replication mechanism needs to know when a log is closed and can be forwarded to the destination table.
+ *
+ * The state of the WALs is kept in Zookeeper under <root>/wals.
+ * For each server, there is a znode formatted like the TServerInstance.toString(): "host:port[sessionid]".
+ * Under the server znode, is a node for each log, using the UUID for the log.
+ * In each of the WAL znodes, is the current state of the log, and the full path to the log.
+ *
+ * The state [OPEN, CLOSED, UNREFERENCED] is what the tablet server believes to be the state of the file.
+ *
+ * In the event of a recovery, the log is identified as belonging to a dead server.  The master will update
+ * the tablets assigned to that server with log references. Once all tablets have been reassigned and the log
+ * references are removed, the log will be eligible for deletion.
+ *
+ * Even when a log is UNREFERENCED by the tablet server, the replication mechanism may still need the log.
+ * The GC will defer log removal until replication is finished with it.
+ *
+ */
+public class WalMarker {
+
+  public class WalMarkerException extends Exception {
+    static private final long serialVersionUID = 1L;
+
+    public WalMarkerException(Exception ex) {
+      super(ex);
+    }
+  }
+
+  public final static String ZWALS = "/wals";
+
+  public static enum WalState {
+    /* log is open, and may be written to */
+    OPEN,
+    /* log is closed, and will not be written to again */
+    CLOSED,
+    /* unreferenced: no tablet needs the log for recovery */
+    UNREFERENCED
+  }
+
+  private final Instance instance;
+  private final ZooReaderWriter zoo;
+
+  public WalMarker(Instance instance, ZooReaderWriter zoo) {
+    this.instance = instance;
+    this.zoo = zoo;
+  }
+
+  private String root() {
+    return ZooUtil.getRoot(instance) + ZWALS;
+  }
+
+  // Tablet server exists
+  public void initWalMarker(TServerInstance tsi) throws WalMarkerException {
+    byte[] data = new byte[0];
+    try {
+      zoo.putPersistentData(root() + "/" + tsi.toString(), data, NodeExistsPolicy.FAIL);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server opens a new WAL
+  public void addNewWalMarker(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.OPEN);
+  }
+
+  private void updateState(TServerInstance tsi, Path path, WalState state) throws WalMarkerException {
+    byte[] data = (state.toString() + "," + path.toString()).getBytes(UTF_8);
+    try {
+      NodeExistsPolicy policy = NodeExistsPolicy.OVERWRITE;
+      if (state == WalState.OPEN) {
+        policy = NodeExistsPolicy.FAIL;
+      }
+      zoo.putPersistentData(root() + "/" + tsi.toString() + "/" + path.getName(), data, policy);
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // Tablet server has no references to the WAL
+  public void walUnreferenced(TServerInstance tsi, Path path) throws WalMarkerException {
+    updateState(tsi, path, WalState.UNREFERENCED);
+  }
+
+  private static Pair<WalState,Path> parse(byte data[]) {
+    String parts[] = new String(data, UTF_8).split(",");
+    return new Pair<>(WalState.valueOf(parts[0]), new Path(parts[1]));
+  }
+
+  // Master needs to know the logs for the given instance
+  public List<Path> getWalsInUse(TServerInstance tsi) throws WalMarkerException {
+    List<Path> result = new ArrayList<>();
+    try {
+      String zpath = root() + "/" + tsi.toString();
+      zoo.sync(zpath);
+      for (String child : zoo.getChildren(zpath)) {
+        Pair<WalState,Path> parts = parse(zoo.getData(zpath + "/" + child, null));
+        if (parts.getFirst() != WalState.UNREFERENCED) {
+          result.add(parts.getSecond());
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants the list of logs markers for all servers
+  public Map<TServerInstance,List<UUID>> getAllMarkers() throws WalMarkerException {
+    Map<TServerInstance,List<UUID>> result = new HashMap<>();
+    try {
+      String path = root();
+      for (String child : zoo.getChildren(path)) {
+        List<UUID> logs = result.get(child);
+        if (logs == null) {
+          result.put(new TServerInstance(child), logs = new ArrayList<>());
+        }
+        for (String idString : zoo.getChildren(path + "/" + child)) {
+          logs.add(UUID.fromString(idString));
+        }
+      }
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+    return result;
+  }
+
+  // garbage collector wants to know the state (open/closed) of a log, and the filename to delete
+  public Pair<WalState,Path> state(TServerInstance instance, UUID uuid) throws WalMarkerException {
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      return parse(zoo.getData(path, null));
+    } catch (KeeperException | InterruptedException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // utility combination of getAllMarkers and state
+  public Map<Path,WalState> getAllState() throws WalMarkerException {
+    Map<Path,WalState> result = new HashMap<>();
+    for (Entry<TServerInstance,List<UUID>> entry : getAllMarkers().entrySet()) {
+      for (UUID id : entry.getValue()) {
+        Pair<WalState,Path> state = state(entry.getKey(), id);
+        result.put(state.getSecond(), state.getFirst());
+      }
+    }
+    return result;
+  }
+
+  // garbage collector knows it's safe to remove the marker for a closed log
+  public void removeWalMarker(TServerInstance instance, UUID uuid) throws WalMarkerException {
+    try {
+      String path = root() + "/" + instance.toString() + "/" + uuid.toString();
+      zoo.delete(path, -1);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // garbage collector knows the instance is dead, and has no markers
+  public void forget(TServerInstance instance) throws WalMarkerException {
+    String path = root() + "/" + instance.toString();
+    try {
+      zoo.recursiveDelete(path, NodeMissingPolicy.FAIL);
+    } catch (InterruptedException | KeeperException e) {
+      throw new WalMarkerException(e);
+    }
+  }
+
+  // tablet server can mark the log as closed (but still needed), for replication to begin
+  public void closeWal(TServerInstance instance, Path path) throws WalMarkerException {
+    updateState(instance, path, WalState.CLOSED);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
index 600349b..7763c25 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/MetaDataStateStore.java
@@ -19,7 +19,6 @@ package org.apache.accumulo.server.master.state;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.BatchWriter;
@@ -33,11 +32,8 @@ import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
 
 public class MetaDataStateStore extends TabletStateStore {
-  private static final Logger log = Logger.getLogger(MetaDataStateStore.class);
 
   private static final int THREADS = 4;
   private static final int LATENCY = 1000;
@@ -163,29 +159,4 @@ public class MetaDataStateStore extends TabletStateStore {
     return "Normal Tablets";
   }
 
-  @Override
-  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException {
-    BatchWriter writer = createBatchWriter();
-    try {
-      for (Entry<TServerInstance,List<Path>> entry : logs.entrySet()) {
-        if (entry.getValue().isEmpty()) {
-          continue;
-        }
-        Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
-        for (Path log : entry.getValue()) {
-          m.put(MetadataSchema.CurrentLogsSection.COLF, new Text(log.toString()), MetadataSchema.CurrentLogsSection.UNUSED);
-        }
-        writer.addMutation(m);
-      }
-    } catch (Exception ex) {
-      log.error("Error marking logs as unused: " + logs);
-      throw new DistributedStoreException(ex);
-    } finally {
-      try {
-        writer.close();
-      } catch (MutationsRejectedException e) {
-        throw new DistributedStoreException(e);
-      }
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
index d2d4f44..ace9f05 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TServerInstance.java
@@ -42,8 +42,8 @@ public class TServerInstance implements Comparable<TServerInstance>, Serializabl
 
   // HostAndPort is not Serializable
   private transient HostAndPort location;
-  private String session;
-  private String cachedStringRepresentation;
+  private final String session;
+  private final String cachedStringRepresentation;
 
   public TServerInstance(HostAndPort address, String session) {
     this.location = address;
@@ -51,6 +51,16 @@ public class TServerInstance implements Comparable<TServerInstance>, Serializabl
     this.cachedStringRepresentation = hostPort() + "[" + session + "]";
   }
 
+  public TServerInstance(String formattedString) {
+    int pos = formattedString.indexOf("[");
+    if (pos < 0 || !formattedString.endsWith("]")) {
+      throw new IllegalArgumentException(formattedString);
+    }
+    this.location = HostAndPort.fromString(formattedString.substring(0, pos));
+    this.session = formattedString.substring(pos + 1, formattedString.length() - 1);
+    this.cachedStringRepresentation = hostPort() + "[" + session + "]";
+  }
+
   public TServerInstance(HostAndPort address, long session) {
     this(address, Long.toHexString(session));
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
index 147e071..3ead237 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/TabletStateStore.java
@@ -88,10 +88,4 @@ public abstract class TabletStateStore implements Iterable<TabletLocationState>
     }
     store.setLocations(Collections.singletonList(assignment));
   }
-
-  /**
-   * When a server fails, its logs must be marked as unused after the log markers are moved to the tablets.
-   */
-  abstract public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) throws DistributedStoreException;
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
index 03627e3..720046f 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/master/state/ZooTabletStateStore.java
@@ -27,9 +27,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -183,8 +181,6 @@ public class ZooTabletStateStore extends TabletStateStore {
             throw new DistributedStoreException(ex);
           }
           store.put(RootTable.ZROOT_TABLET_WALOGS + "/" + logEntry.getUniqueID(), value);
-          store.remove(RootTable.ZROOT_TABLET_CURRENT_LOGS + "/" + MetadataSchema.CurrentLogsSection.getRowPrefix() + tls.current.toString()
-              + logEntry.getUniqueID());
         }
       }
     }
@@ -197,10 +193,4 @@ public class ZooTabletStateStore extends TabletStateStore {
   public String name() {
     return "Root Table";
   }
-
-  @Override
-  public void markLogsAsUnused(AccumuloServerContext context, Map<TServerInstance,List<Path>> logs) {
-    // the root table is not replicated, so unassigning the root tablet has removed the current log marker
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
index 9e3fc7d..7cf3f37 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/ListVolumesUsed.java
@@ -34,8 +34,9 @@ import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.conf.ServerConfigurationFactory;
 import org.apache.accumulo.server.fs.VolumeManager.FileType;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
 
 /**
  *
@@ -123,15 +124,13 @@ public class ListVolumesUsed {
       System.out.println("\tVolume : " + volume);
 
     volumes.clear();
-    scanner.clearColumns();
-    scanner.setRange(MetadataSchema.CurrentLogsSection.getRange());
-    Text path = new Text();
-    for (Entry<Key,Value> entry : scanner) {
-      MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
+
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
+    for (Path path : wals.getAllState().keySet()) {
       volumes.add(getLogURI(path.toString()));
     }
 
-    System.out.println("Listing volumes referenced in " + name + " current logs section");
+    System.out.println("Listing volumes referenced in " + name + " current logs");
 
     for (String volume : volumes)
       System.out.println("\tVolume : " + volume);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 5d5415d..45d2fef 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -59,7 +59,6 @@ import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
@@ -81,12 +80,10 @@ import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
-import org.apache.accumulo.server.TabletLevel;
 import org.apache.accumulo.server.client.HdfsZooInstance;
 import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.fs.VolumeManagerImpl;
-import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.tablets.TabletTime;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
@@ -1055,139 +1052,4 @@ public class MetadataTableUtil {
     return tabletEntries;
   }
 
-  public static void addNewLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename, TabletLevel level) {
-    log.debug("Adding log entry " + filename);
-    if (level == TabletLevel.ROOT) {
-      LogEntry log = new LogEntry(RootTable.EXTENT, System.currentTimeMillis(), tabletSession.hostPort(), filename.toString());
-      final byte[] node;
-      try {
-        node = log.toBytes();
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to write to byte array", e);
-      }
-      retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-        @Override
-        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-          String uniqueId = filename.getName();
-          StringBuilder path = new StringBuilder(root);
-          path.append("/");
-          path.append(CurrentLogsSection.getRowPrefix());
-          path.append(tabletSession.toString());
-          path.append(uniqueId);
-          rw.putPersistentData(path.toString(), node, NodeExistsPolicy.OVERWRITE);
-        }
-      });
-    } else {
-      Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-      m.put(CurrentLogsSection.COLF, new Text(filename.toString()), new Value(EMPTY_BYTES));
-      String tableName = MetadataTable.NAME;
-      if (level == TabletLevel.META) {
-        tableName = RootTable.NAME;
-      }
-      BatchWriter bw = null;
-      try {
-        bw = context.getConnector().createBatchWriter(tableName, null);
-        bw.addMutation(m);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      } finally {
-        if (bw != null) {
-          try {
-            bw.close();
-          } catch (Exception e2) {
-            throw new RuntimeException(e2);
-          }
-        }
-      }
-    }
-  }
-
-  private static void removeCurrentRootLogMarker(ClientContext context, ZooLock zooLock, final TServerInstance tabletSession, final Path filename) {
-    retryZooKeeperUpdate(context, zooLock, new ZooOperation() {
-      @Override
-      public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-        String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-        String uniqueId = filename.getName();
-        String path = root + "/" + CurrentLogsSection.getRowPrefix() + tabletSession.toString() + uniqueId;
-        log.debug("Removing entry " + path + " from zookeeper");
-        rw.recursiveDelete(path, NodeMissingPolicy.SKIP);
-      }
-    });
-  }
-
-  public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
-    // There could be a marker at the meta and/or root level, mark them both as unused
-    try {
-      BatchWriter root = null;
-      BatchWriter meta = null;
-      try {
-        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-        for (Path fname : all) {
-          Text tname = new Text(fname.toString());
-          Mutation m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-          m.putDelete(MetadataSchema.CurrentLogsSection.COLF, tname);
-          root.addMutation(m);
-          log.debug("deleting " + MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString() + " log:" + fname);
-          m = new Mutation(MetadataSchema.CurrentLogsSection.getRowPrefix() + tabletSession.toString());
-          m.put(MetadataSchema.CurrentLogsSection.COLF, tname, MetadataSchema.CurrentLogsSection.UNUSED);
-          meta.addMutation(m);
-          removeCurrentRootLogMarker(context, lock, tabletSession, fname);
-        }
-      } finally {
-        if (root != null) {
-          root.close();
-        }
-        if (meta != null) {
-          meta.close();
-        }
-      }
-    } catch (Exception ex) {
-      throw new AccumuloException(ex);
-    }
-  }
-
-  public static void fetchLogsForDeadServer(ClientContext context, ZooLock lock, KeyExtent extent, TServerInstance server,
-      Map<TServerInstance,List<Path>> logsForDeadServers) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
-    // already cached
-    if (logsForDeadServers.containsKey(server)) {
-      return;
-    }
-    if (extent.isRootTablet()) {
-      final List<Path> logs = new ArrayList<>();
-      retryZooKeeperUpdate(context, lock, new ZooOperation() {
-        @Override
-        public void run(IZooReaderWriter rw) throws KeeperException, InterruptedException, IOException {
-          String root = ZooUtil.getRoot(HdfsZooInstance.getInstance()) + RootTable.ZROOT_TABLET_CURRENT_LOGS;
-          logs.clear();
-          for (String child : rw.getChildren(root)) {
-            byte[] data = rw.getData(root + "/" + child, null);
-            LogEntry entry = LogEntry.fromBytes(data);
-            logs.add(new Path(entry.filename));
-          }
-        }
-      });
-      logsForDeadServers.put(server, logs);
-    } else {
-      // use the correct meta table
-      String table = MetadataTable.NAME;
-      if (extent.isMeta()) {
-        table = RootTable.NAME;
-      }
-      // fetch the current logs in use, and put them in the cache
-      Scanner scanner = context.getConnector().createScanner(table, Authorizations.EMPTY);
-      scanner.setRange(new Range(MetadataSchema.CurrentLogsSection.getRowPrefix() + server.toString()));
-      List<Path> logs = new ArrayList<>();
-      Text path = new Text();
-      for (Entry<Key,Value> entry : scanner) {
-        MetadataSchema.CurrentLogsSection.getPath(entry.getKey(), path);
-        if (!entry.getValue().equals(MetadataSchema.CurrentLogsSection.UNUSED)) {
-          logs.add(new Path(path.toString()));
-        }
-      }
-      logsForDeadServers.put(server, logs);
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 194d357..414d29e 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -30,36 +30,24 @@ import java.util.UUID;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
-import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.Mutation;
-import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.gc.thrift.GCStatus;
 import org.apache.accumulo.core.gc.thrift.GcCycleStats;
 import org.apache.accumulo.core.metadata.MetadataTable;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
-import org.apache.accumulo.core.protobuf.ProtobufUtil;
-import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
-import org.apache.accumulo.core.replication.ReplicationTable;
-import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.security.Authorizations;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
-import org.apache.accumulo.core.volume.Volume;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.fs.VolumeManagerImpl;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
 import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -67,34 +55,25 @@ import org.apache.accumulo.server.master.state.RootTabletStateStore;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.accumulo.server.master.state.TabletState;
-import org.apache.accumulo.server.replication.StatusUtil;
-import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
-import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
-import com.google.common.net.HostAndPort;
-import com.google.protobuf.InvalidProtocolBufferException;
 
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
 
-  // The order of these is _very_ important. Must read from current_logs, then walogs because ZooTabletStateStore writes to
-  // walogs and then removes from current_logs
-  private static final String[] ZK_LOG_SUFFIXES = new String[] {RootTable.ZROOT_TABLET_CURRENT_LOGS, RootTable.ZROOT_TABLET_WALOGS};
-
   private final AccumuloServerContext context;
   private final VolumeManager fs;
   private final boolean useTrash;
   private final LiveTServerSet liveServers;
+  private final WalMarker walMarker;
+  private final Iterable<TabletLocationState> store;
 
   /**
    * Creates a new GC WAL object.
@@ -106,7 +85,7 @@ public class GarbageCollectWriteAheadLogs {
    * @param useTrash
    *          true to move files to trash rather than delete them
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(final AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
@@ -118,6 +97,13 @@ public class GarbageCollectWriteAheadLogs {
       }
     });
     liveServers.startListeningForTabletServerChanges();
+    this.walMarker = new WalMarker(context.getInstance(), ZooReaderWriter.getInstance());
+    this.store = new Iterable<TabletLocationState>() {
+      @Override
+      public Iterator<TabletLocationState> iterator() {
+        return Iterators.concat(new RootTabletStateStore(context).iterator(), new MetaDataStateStore(context).iterator());
+      }
+    };
   }
 
   /**
@@ -133,32 +119,45 @@ public class GarbageCollectWriteAheadLogs {
    *          a started LiveTServerSet instance
    */
   @VisibleForTesting
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet, WalMarker walMarker,
+      Iterable<TabletLocationState> store) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
     this.liveServers = liveTServerSet;
+    this.walMarker = walMarker;
+    this.store = store;
   }
 
   public void collect(GCStatus status) {
 
     Span span = Trace.start("getCandidates");
     try {
-      Set<TServerInstance> currentServers = liveServers.getCurrentServers();
-
       status.currentLog.started = System.currentTimeMillis();
 
-      Map<TServerInstance,Set<Path>> candidates = new HashMap<>();
-      long count = getCurrent(candidates, currentServers);
+      Map<TServerInstance,Set<UUID>> logsByServer = new HashMap<>();
+      Map<UUID,Pair<WalState,Path>> logsState = new HashMap<>();
+      // Scan for log file info first: the order is important
+      // Consider:
+      // * get live servers
+      // * new server gets a lock, creates a log
+      // * get logs
+      // * the log appears to belong to a dead server
+      long count = getCurrent(logsByServer, logsState);
       long fileScanStop = System.currentTimeMillis();
 
-      log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, candidates.size(), (fileScanStop - status.currentLog.started) / 1000.));
+      log.info(String.format("Fetched %d files for %d servers in %.2f seconds", count, logsByServer.size(), (fileScanStop - status.currentLog.started) / 1000.));
       status.currentLog.candidates = count;
       span.stop();
 
+      // now it's safe to get the liveServers
+      Set<TServerInstance> currentServers = liveServers.getCurrentServers();
+
+      Map<UUID,TServerInstance> uuidToTServer;
       span = Trace.start("removeEntriesInUse");
       try {
-        count = removeEntriesInUse(candidates, status, currentServers);
+        uuidToTServer = removeEntriesInUse(logsByServer, currentServers, logsState);
+        count = uuidToTServer.size();
       } catch (Exception ex) {
         log.error("Unable to scan metadata table", ex);
         return;
@@ -171,7 +170,7 @@ public class GarbageCollectWriteAheadLogs {
 
       span = Trace.start("removeReplicationEntries");
       try {
-        count = removeReplicationEntries(candidates, status);
+        count = removeReplicationEntries(uuidToTServer);
       } catch (Exception ex) {
         log.error("Unable to scan replication table", ex);
         return;
@@ -184,14 +183,15 @@ public class GarbageCollectWriteAheadLogs {
 
       span = Trace.start("removeFiles");
 
-      count = removeFiles(candidates, status);
+      logsState.keySet().retainAll(uuidToTServer.keySet());
+      count = removeFiles(logsState.values(), status);
 
       long removeStop = System.currentTimeMillis();
-      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, candidates.size(), (removeStop - logEntryScanStop) / 1000.));
+      log.info(String.format("%d total logs removed from %d servers in %.2f seconds", count, logsByServer.size(), (removeStop - logEntryScanStop) / 1000.));
       span.stop();
 
       span = Trace.start("removeMarkers");
-      count = removeTabletServerMarkers(candidates);
+      count = removeTabletServerMarkers(uuidToTServer, logsByServer, currentServers);
       long removeMarkersStop = System.currentTimeMillis();
       log.info(String.format("%d markers removed in %.2f seconds", count, (removeMarkersStop - removeStop) / 1000.));
       span.stop();
@@ -207,50 +207,43 @@ public class GarbageCollectWriteAheadLogs {
     }
   }
 
-  private long removeTabletServerMarkers(Map<TServerInstance,Set<Path>> candidates) {
+  private long removeTabletServerMarkers(Map<UUID,TServerInstance> uidMap, Map<TServerInstance,Set<UUID>> candidates, Set<TServerInstance> liveServers) {
     long result = 0;
+    // remove markers for files removed
     try {
-      BatchWriter root = null;
-      BatchWriter meta = null;
-      try {
-        root = context.getConnector().createBatchWriter(RootTable.NAME, null);
-        meta = context.getConnector().createBatchWriter(MetadataTable.NAME, null);
-        for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-          Mutation m = new Mutation(CurrentLogsSection.getRowPrefix() + entry.getKey().toString());
-          for (Path path : entry.getValue()) {
-            m.putDelete(CurrentLogsSection.COLF, new Text(path.toString()));
-            result++;
-          }
-          root.addMutation(m);
-          meta.addMutation(m);
-        }
-      } finally {
-        if (meta != null) {
-          meta.close();
-        }
-        if (root != null) {
-          root.close();
-        }
+      for (Entry<UUID,TServerInstance> entry : uidMap.entrySet()) {
+        walMarker.removeWalMarker(entry.getValue(), entry.getKey());
       }
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
+    // remove parent znode for dead tablet servers
+    for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
+      if (!liveServers.contains(entry.getKey())) {
+        log.info("Removing znode for " + entry.getKey());
+        try {
+          walMarker.forget(entry.getKey());
+        } catch (WalMarkerException ex) {
+          log.info("Error removing znode for " + entry.getKey() + " " + ex.toString());
+        }
+      }
+    }
     return result;
   }
 
-  private long removeFiles(Map<TServerInstance,Set<Path>> candidates, final GCStatus status) {
-    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-      for (Path path : entry.getValue()) {
-        log.debug("Removing unused WAL for server " + entry.getKey() + " log " + path);
-        try {
-          if (!useTrash || !fs.moveToTrash(path))
-            fs.deleteRecursively(path);
-          status.currentLog.deleted++;
-        } catch (FileNotFoundException ex) {
-          // ignored
-        } catch (IOException ex) {
-          log.error("Unable to delete wal " + path + ": " + ex);
+  private long removeFiles(Collection<Pair<WalState,Path>> collection, final GCStatus status) {
+    for (Pair<WalState,Path> stateFile : collection) {
+      Path path = stateFile.getSecond();
+      log.debug("Removing " + stateFile.getFirst() + " WAL " + path);
+      try {
+        if (!useTrash || !fs.moveToTrash(path)) {
+          fs.deleteRecursively(path);
         }
+        status.currentLog.deleted++;
+      } catch (FileNotFoundException ex) {
+        // ignored
+      } catch (IOException ex) {
+        log.error("Unable to delete wal " + path + ": " + ex);
       }
     }
     return status.currentLog.deleted;
@@ -260,269 +253,107 @@ public class GarbageCollectWriteAheadLogs {
     return UUID.fromString(path.getName());
   }
 
-  private long removeEntriesInUse(Map<TServerInstance,Set<Path>> candidates, GCStatus status, Set<TServerInstance> liveServers) throws IOException,
-      KeeperException, InterruptedException {
-
-    // remove any entries if there's a log reference, or a tablet is still assigned to the dead server
+  private Map<UUID,TServerInstance> removeEntriesInUse(Map<TServerInstance,Set<UUID>> candidates, Set<TServerInstance> liveServers,
+      Map<UUID,Pair<WalState,Path>> logsState) throws IOException, KeeperException, InterruptedException {
 
-    Map<UUID,TServerInstance> walToDeadServer = new HashMap<>();
-    for (Entry<TServerInstance,Set<Path>> entry : candidates.entrySet()) {
-      for (Path file : entry.getValue()) {
-        walToDeadServer.put(path2uuid(file), entry.getKey());
+    Map<UUID,TServerInstance> result = new HashMap<>();
+    for (Entry<TServerInstance,Set<UUID>> entry : candidates.entrySet()) {
+      for (UUID id : entry.getValue()) {
+        result.put(id, entry.getKey());
       }
     }
-    long count = 0;
-    RootTabletStateStore root = new RootTabletStateStore(context);
-    MetaDataStateStore meta = new MetaDataStateStore(context);
-    Iterator<TabletLocationState> states = Iterators.concat(root.iterator(), meta.iterator());
+
+    // remove any entries if there's a log reference (recovery hasn't finished)
+    Iterator<TabletLocationState> states = store.iterator();
     while (states.hasNext()) {
-      count++;
       TabletLocationState state = states.next();
+
+      // Tablet is still assigned to a dead server. Master has moved markers and reassigned it
+      // Easiest to just ignore all the WALs for the dead server.
       if (state.getState(liveServers) == TabletState.ASSIGNED_TO_DEAD_SERVER) {
-        candidates.remove(state.current);
+        Set<UUID> idsToIgnore = candidates.remove(state.current);
+        if (idsToIgnore != null) {
+          for (UUID id : idsToIgnore) {
+            result.remove(id);
+          }
+        }
       }
+      // Tablet is being recovered and has WAL references, remove all the WALs for the dead server
+      // that made the WALs.
       for (Collection<String> wals : state.walogs) {
         for (String wal : wals) {
           UUID walUUID = path2uuid(new Path(wal));
-          TServerInstance dead = walToDeadServer.get(walUUID);
-          if (dead != null) {
-            Iterator<Path> iter = candidates.get(dead).iterator();
-            while (iter.hasNext()) {
-              if (path2uuid(iter.next()).equals(walUUID)) {
-                iter.remove();
-                break;
-              }
+          TServerInstance dead = result.get(walUUID);
+          // There's a reference to a log file, so skip that server's logs
+          Set<UUID> idsToIgnore = candidates.remove(dead);
+          if (idsToIgnore != null) {
+            for (UUID id : idsToIgnore) {
+              result.remove(id);
             }
           }
         }
       }
     }
-    return count;
-  }
-
-  protected int removeReplicationEntries(Map<TServerInstance,Set<Path>> candidates, GCStatus status) throws IOException, KeeperException, InterruptedException {
-    Connector conn;
-    try {
-      conn = context.getConnector();
-    } catch (AccumuloException | AccumuloSecurityException e) {
-      log.error("Failed to get connector", e);
-      throw new IllegalArgumentException(e);
-    }
-
-    int count = 0;
-
-    Iterator<Entry<TServerInstance,Set<Path>>> walIter = candidates.entrySet().iterator();
-
-    while (walIter.hasNext()) {
-      Entry<TServerInstance,Set<Path>> wal = walIter.next();
-      Iterator<Path> paths = wal.getValue().iterator();
-      while (paths.hasNext()) {
-        Path fullPath = paths.next();
-        if (neededByReplication(conn, fullPath)) {
-          log.debug("Removing WAL from candidate deletion as it is still needed for replication: {}", fullPath);
-          // If we haven't already removed it, check to see if this WAL is
-          // "in use" by replication (needed for replication purposes)
-          status.currentLog.inUse++;
-          paths.remove();
-        } else {
-          log.debug("WAL not needed for replication {}", fullPath);
-        }
-      }
-      if (wal.getValue().isEmpty()) {
-        walIter.remove();
-      }
-      count++;
-    }
 
-    return count;
-  }
-
-  /**
-   * Determine if the given WAL is needed for replication
-   *
-   * @param wal
-   *          The full path (URI)
-   * @return True if the WAL is still needed by replication (not a candidate for deletion)
-   */
-  protected boolean neededByReplication(Connector conn, Path wal) {
-    log.info("Checking replication table for " + wal);
-
-    Iterable<Entry<Key,Value>> iter = getReplicationStatusForFile(conn, wal);
-
-    // TODO Push down this filter to the tserver to only return records
-    // that are not completely replicated and convert this loop into a
-    // `return s.iterator.hasNext()` statement
-    for (Entry<Key,Value> entry : iter) {
-      try {
-        Status status = Status.parseFrom(entry.getValue().get());
-        log.info("Checking if {} is safe for removal with {}", wal, ProtobufUtil.toString(status));
-        if (!StatusUtil.isSafeForRemoval(status)) {
-          return true;
+    // Remove OPEN and CLOSED logs for live servers: they are still in use
+    for (TServerInstance liveServer : liveServers) {
+      Set<UUID> idsForServer = candidates.get(liveServer);
+      // Server may not have any logs yet
+      if (idsForServer != null) {
+        for (UUID id : idsForServer) {
+          Pair<WalState,Path> stateFile = logsState.get(id);
+          if (stateFile.getFirst() != WalState.UNREFERENCED) {
+            result.remove(id);
+          }
         }
-      } catch (InvalidProtocolBufferException e) {
-        log.error("Could not deserialize Status protobuf for " + entry.getKey(), e);
       }
     }
-
-    return false;
+    return result;
   }
 
-  protected Iterable<Entry<Key,Value>> getReplicationStatusForFile(Connector conn, Path wal) {
-    Scanner metaScanner;
-    try {
-      metaScanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    } catch (TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-
-    // Need to add in the replication section prefix
-    metaScanner.setRange(Range.exact(ReplicationSection.getRowPrefix() + wal));
-    // Limit the column family to be sure
-    metaScanner.fetchColumnFamily(ReplicationSection.COLF);
-
+  protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+    Connector conn;
     try {
-      Scanner replScanner = ReplicationTable.getScanner(conn);
-
-      // Scan only the Status records
-      StatusSection.limit(replScanner);
-
-      // Only look for this specific WAL
-      replScanner.setRange(Range.exact(wal.toString()));
+      conn = context.getConnector();
+      final Scanner scanner = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
+      scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+      scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+      for (Entry<Key,Value> entry : scanner) {
+        Text file = new Text();
+        MetadataSchema.ReplicationSection.getFile(entry.getKey(), file);
+        UUID id = path2uuid(new Path(file.toString()));
+        candidates.remove(id);
+        log.info("Ignore closed log " + id + " because it is being replicated");
+      }
 
-      return Iterables.concat(metaScanner, replScanner);
-    } catch (ReplicationTableOfflineException e) {
-      // do nothing
+      return candidates.size();
+    } catch (AccumuloException | AccumuloSecurityException | TableNotFoundException e) {
+      log.error("Failed to scan metadata table", e);
+      throw new IllegalArgumentException(e);
     }
-
-    return metaScanner;
   }
 
   /**
-   * Scans log markers. The map passed in is populated with the logs for dead servers.
+   * Scans log markers. The map passed in is populated with the log ids.
    *
-   * @param unusedLogs
+   * @param logsByServer
    *          map of dead server to log file entries
    * @return total number of log files
    */
-  private long getCurrent(Map<TServerInstance,Set<Path>> unusedLogs, Set<TServerInstance> currentServers) throws Exception {
-    // Logs for the Root table are stored in ZooKeeper.
-    Set<Path> rootWALs = getRootLogs(ZooReaderWriter.getInstance(), context.getInstance());
-
-    long count = 0;
-
-    // get all the WAL markers that are not in zookeeper for dead servers
-    Scanner rootScanner = context.getConnector().createScanner(RootTable.NAME, Authorizations.EMPTY);
-    rootScanner.setRange(CurrentLogsSection.getRange());
-    Scanner metaScanner = context.getConnector().createScanner(MetadataTable.NAME, Authorizations.EMPTY);
-    metaScanner.setRange(CurrentLogsSection.getRange());
-    Iterator<Entry<Key,Value>> entries = Iterators.concat(rootScanner.iterator(), metaScanner.iterator());
-    Text hostAndPort = new Text();
-    Text sessionId = new Text();
-    Text filename = new Text();
-    while (entries.hasNext()) {
-      Entry<Key,Value> entry = entries.next();
-
-      CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
-      CurrentLogsSection.getPath(entry.getKey(), filename);
-      TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
-      Path path = new Path(filename.toString());
-
-      // A log is unused iff it's for a tserver which we don't know about or the log is marked as unused and it's not listed as used by the Root table
-      if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
-        Set<Path> logs = unusedLogs.get(tsi);
-        if (logs == null) {
-          unusedLogs.put(tsi, logs = new HashSet<Path>());
-        }
-        if (logs.add(path)) {
-          count++;
-        }
-      }
-    }
-
-    // scan HDFS for logs for dead servers
-    for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
-      addUnusedWalsFromVolume(volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true), unusedLogs, context.getConnector()
-          .getInstance().getZooKeepersSessionTimeOut());
-    }
-    return count;
-  }
-
-  /**
-   * Fetch the WALs which are, or were, referenced by the Root Table
-   *
-   * @return The Set of WALs which are needed by the Root Table
-   */
-  Set<Path> getRootLogs(final ZooReader zoo, Instance instance) throws Exception {
-    final HashSet<Path> rootWALs = new HashSet<>();
-    final String zkRoot = ZooUtil.getRoot(instance);
-
-    // Get entries in zookeeper -- order in ZK_LOG_SUFFIXES is _very_ important
-    for (String pathSuffix : ZK_LOG_SUFFIXES) {
-      addLogsForNode(zoo, zkRoot + pathSuffix, rootWALs);
-    }
-
-    return rootWALs;
-  }
-
-  /**
-   * Read all WALs from the given path in ZooKeeper and add the paths to each WAL to the provided <code>rootWALs</code>
-   *
-   * @param reader
-   *          A reader to ZooKeeper
-   * @param zpath
-   *          The base path to read in ZooKeeper
-   * @param rootWALs
-   *          A Set to collect the WALs in
-   */
-  void addLogsForNode(ZooReader reader, String zpath, HashSet<Path> rootWALs) throws Exception {
-    // Get entries in zookeeper:
-    List<String> children = reader.getChildren(zpath);
-    for (String child : children) {
-      LogEntry entry = LogEntry.fromBytes(reader.getData(zpath + "/" + child, null));
-      rootWALs.add(new Path(entry.filename));
-    }
-  }
+  private long getCurrent(Map<TServerInstance,Set<UUID>> logsByServer, Map<UUID,Pair<WalState,Path>> logState) throws Exception {
 
-  /**
-   * Given a traversal over the <code>wals</code> directory on a {@link Volume}, add all unused WALs
-   *
-   * @param iter
-   *          Iterator over files in the <code>wals</code> directory
-   * @param unusedLogs
-   *          Map tracking unused WALs by server
-   */
-  void addUnusedWalsFromVolume(RemoteIterator<LocatedFileStatus> iter, Map<TServerInstance,Set<Path>> unusedLogs, int zkTimeout) throws Exception {
-    while (iter.hasNext()) {
-      LocatedFileStatus next = iter.next();
-      // recursive listing returns directories, too
-      if (next.isDirectory()) {
-        continue;
-      }
-      // make sure we've waited long enough for zookeeper propagation
-      //
-      // We aren't guaranteed to see the updates through the live tserver set just because the time since the file was
-      // last modified is longer than the ZK timeout.
-      // 1. If we think the server is alive, but it's actually dead, we'll grab it on a later cycle. Which is OK.
-      // 2. If we think the server is dead but it happened to be restarted it's possible to have a server which would differ only by session.
-      // This is also OK because the new TServer will create a new WAL.
-      if (System.currentTimeMillis() - next.getModificationTime() < zkTimeout) {
-        continue;
-      }
-      Path path = next.getPath();
-      String hostPlusPort = path.getParent().getName();
-      // server is still alive, or has a replacement (same host+port, different session)
-      TServerInstance instance = liveServers.find(hostPlusPort);
-      if (instance != null) {
-        continue;
-      }
-      TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
-      Set<Path> paths = unusedLogs.get(fake);
-      if (paths == null) {
-        paths = new HashSet<>();
+    // get all the unused WALs in zookeeper
+    long result = 0;
+    Map<TServerInstance,List<UUID>> markers = walMarker.getAllMarkers();
+    for (Entry<TServerInstance,List<UUID>> entry : markers.entrySet()) {
+      HashSet<UUID> ids = new HashSet<>(entry.getValue().size());
+      for (UUID id : entry.getValue()) {
+        ids.add(id);
+        logState.put(id, walMarker.state(entry.getKey(), id));
+        result++;
       }
-      paths.add(path);
-      unusedLogs.put(fake, paths);
+      logsByServer.put(entry.getKey(), ids);
     }
+    return result;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
index cb4b341..03d2e67 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.java
@@ -17,11 +17,11 @@
 package org.apache.accumulo.gc.replication;
 
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 
 import org.apache.accumulo.core.client.BatchScanner;
 import org.apache.accumulo.core.client.BatchWriter;
@@ -37,7 +37,6 @@ import org.apache.accumulo.core.file.rfile.RFile;
 import org.apache.accumulo.core.master.thrift.MasterClientService;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.CurrentLogsSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.ReplicationSection;
 import org.apache.accumulo.core.replication.ReplicationTable;
 import org.apache.accumulo.core.rpc.ThriftUtil;
@@ -48,8 +47,12 @@ import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.trace.Tracer;
 import org.apache.accumulo.core.trace.thrift.TInfo;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalMarkerException;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
+import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.thrift.TException;
@@ -58,9 +61,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Stopwatch;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
 import com.google.common.net.HostAndPort;
 import com.google.protobuf.InvalidProtocolBufferException;
 
@@ -158,58 +158,30 @@ public class CloseWriteAheadLogReferences implements Runnable {
     log.info("Closed " + recordsClosed + " WAL replication references in replication table in " + sw.toString());
   }
 
+  static final EnumSet<WalState> NOT_OPEN = EnumSet.complementOf(EnumSet.of(WalState.OPEN));
+
   /**
-   * Construct the set of referenced WALs from the metadata table
+   * Construct the set of referenced WALs from zookeeper
    *
    * @param conn
    *          Connector
    * @return The Set of WALs that are referenced in the metadata table
    */
   protected HashSet<String> getReferencedWals(Connector conn) {
-    // Make a bounded cache to alleviate repeatedly creating the same Path object
-    final LoadingCache<String,String> normalizedWalPaths = CacheBuilder.newBuilder().maximumSize(1024).concurrencyLevel(1)
-        .build(new CacheLoader<String,String>() {
-
-          @Override
-          public String load(String key) {
-            return new Path(key).toString();
-          }
-
-        });
+    WalMarker wals = new WalMarker(conn.getInstance(), ZooReaderWriter.getInstance());
 
     HashSet<String> referencedWals = new HashSet<>();
-    BatchScanner bs = null;
     try {
-      // TODO Configurable number of threads
-      bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
-      bs.setRanges(Collections.singleton(CurrentLogsSection.getRange()));
-      bs.fetchColumnFamily(CurrentLogsSection.COLF);
-
-      // For each log key/value in the metadata table
-      for (Entry<Key,Value> entry : bs) {
-        if (entry.getValue().equals(CurrentLogsSection.UNUSED)) {
-          continue;
+      for (Entry<Path,WalState> entry : wals.getAllState().entrySet()) {
+        if (NOT_OPEN.contains(entry.getValue())) {
+          Path path = entry.getKey();
+          log.debug("Found WAL " + path.toString());
+          referencedWals.add(path.toString());
         }
-        Text tpath = new Text();
-        CurrentLogsSection.getPath(entry.getKey(), tpath);
-        String path = new Path(tpath.toString()).toString();
-        log.debug("Found WAL " + path.toString());
-
-        // Normalize each log file (using Path) and add it to the set
-        referencedWals.add(normalizedWalPaths.get(path));
       }
-    } catch (TableNotFoundException e) {
-      // uhhhh
+    } catch (WalMarkerException e) {
       throw new RuntimeException(e);
-    } catch (ExecutionException e) {
-      log.error("Failed to normalize WAL file path", e);
-      throw new RuntimeException(e);
-    } finally {
-      if (null != bs) {
-        bs.close();
-      }
     }
-
     return referencedWals;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/0b487930/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index f431159..9cb32c8 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -16,153 +16,200 @@
  */
 package org.apache.accumulo.gc;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
 import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+import java.util.Map.Entry;
 import java.util.UUID;
 
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.core.gc.thrift.GCStatus;
+import org.apache.accumulo.core.gc.thrift.GcCycleStats;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.log.WalMarker;
+import org.apache.accumulo.server.log.WalMarker.WalState;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.accumulo.server.master.state.TabletLocationState;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
 import org.easymock.EasyMock;
 import org.junit.Test;
 
 public class GarbageCollectWriteAheadLogsTest {
 
-  private Map<TServerInstance,Set<Path>> runTest(LiveTServerSet tserverSet, String tserver, long modificationTime) throws Exception {
-    // Mocks
+  private final TServerInstance server1 = new TServerInstance("localhost:1234[SESSION]");
+  private final TServerInstance server2 = new TServerInstance("localhost:1234[OTHERSESS]");
+  private final UUID id = UUID.randomUUID();
+  private final Map<TServerInstance,List<UUID>> markers = Collections.singletonMap(server1, Collections.singletonList(id));
+  private final Map<TServerInstance,List<UUID>> markers2 = Collections.singletonMap(server2, Collections.singletonList(id));
+  private final Path path = new Path("hdfs://localhost:9000/accumulo/wal/localhost+1234/" + id);
+  private final KeyExtent extent = new KeyExtent(new Text("1<"), new Text(new byte[] {0}));
+  private final Collection<Collection<String>> walogs = Collections.emptyList();
+  private final TabletLocationState tabletAssignedToServer1;
+  private final TabletLocationState tabletAssignedToServer2;
+  {
+    try {
+      tabletAssignedToServer1 = new TabletLocationState(extent, (TServerInstance) null, server1, (TServerInstance) null, walogs, false);
+      tabletAssignedToServer2 = new TabletLocationState(extent, (TServerInstance) null, server2, (TServerInstance) null, walogs, false);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+  private final Iterable<TabletLocationState> tabletOnServer1List = Collections.singletonList(tabletAssignedToServer1);
+  private final Iterable<TabletLocationState> tabletOnServer2List = Collections.singletonList(tabletAssignedToServer2);
+  private final List<Entry<Key,Value>> emptyList = Collections.emptyList();
+  private final Iterator<Entry<Key,Value>> emptyKV = emptyList.iterator();
+
+  @Test
+  public void testRemoveUnusedLog() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    final LocatedFileStatus fileStatus = EasyMock.createMock(LocatedFileStatus.class);
-
-    // Concrete objs
-    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
+    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
 
-    RemoteIterator<LocatedFileStatus> iter = new RemoteIterator<LocatedFileStatus>() {
-      boolean returnedOnce = false;
+    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
-      @Override
-      public boolean hasNext() throws IOException {
-        return !returnedOnce;
-      }
+    EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
 
+    EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+    EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED, path));
+    EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
+    marker.removeWalMarker(server1, id);
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(context, fs, marker, tserverSet);
+    GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List) {
       @Override
-      public LocatedFileStatus next() throws IOException {
-        returnedOnce = true;
-        return fileStatus;
+      protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+        return 0;
       }
     };
-
-    Map<TServerInstance,Set<Path>> unusedLogs = new HashMap<>();
-
-    // Path is /accumulo/wals/host+port/UUID
-    Path walPath = new Path("/accumulo/wals/" + tserver + "/" + UUID.randomUUID().toString());
-    EasyMock.expect(fileStatus.getPath()).andReturn(walPath).anyTimes();
-
-    EasyMock.expect(fileStatus.getModificationTime()).andReturn(modificationTime);
-    EasyMock.expect(fileStatus.isDirectory()).andReturn(false);
-
-    EasyMock.replay(context, fs, fileStatus, tserverSet);
-
-    gcWals.addUnusedWalsFromVolume(iter, unusedLogs, 0);
-
-    EasyMock.verify(context, fs, fileStatus, tserverSet);
-
-    return unusedLogs;
+    gc.collect(status);
+    EasyMock.verify(context, fs, marker, tserverSet);
   }
 
   @Test
-  public void testUnnoticedServerFailure() throws Exception {
+  public void testKeepClosedLog() throws Exception {
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    WalMarker marker = EasyMock.createMock(WalMarker.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    TServerInstance tserverInstance = EasyMock.createMock(TServerInstance.class);
-    String tserver = "host1+9997";
-
-    // We find the TServer
-    EasyMock.expect(tserverSet.find(tserver)).andReturn(tserverInstance);
 
-    // But the modificationTime for the WAL was _way_ in the past.
-    long modificationTime = 0l;
+    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
 
-    // If the modification time for a WAL was significantly in the past (so far that the server _should_ have died
-    // by now) but the GC hasn't observed via ZK Watcher that the server died, we would not treat the
-    // WAL as unused.
-    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
-    // We think the server is still alive, therefore we don't call the WAL unused.
-    assertEquals(0, unusedLogs.size());
+    EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+    EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+    EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.CLOSED, path));
+    EasyMock.replay(context, marker, tserverSet, fs);
+    GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List) {
+      @Override
+      protected int removeReplicationEntries(Map<UUID,TServerInstance> candidates) throws IOException, KeeperException, InterruptedException {
+        return 0;
+      }
+    };
+    gc.collect(status);
+    EasyMock.verify(context, marker, tserverSet, fs);
   }
 
   @Test
-  public void testUnnoticedServerRestart() throws Exception {
+  public void deleteUnreferenceLogOnDeadServer() throws Exception {
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    WalMarker marker = EasyMock.createMock(WalMarker.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    String tserver = "host1+9997";
-
-    // The server was _once_ alive, but we saw it died.
-    // Before the LiveTServerSet gets the Watcher that it's back online (with a new session)
-    // the GC runs
-    EasyMock.expect(tserverSet.find(tserver)).andReturn(null);
-
-    // Modification time for the WAL was _way_ in the past.
-    long modificationTime = 0l;
-
-    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
-    // If the same server comes back, it will use a new WAL, not the old one. The log should be unused
-    assertEquals(1, unusedLogs.size());
+    Connector conn = EasyMock.createMock(Connector.class);
+    Scanner scanner = EasyMock.createMock(Scanner.class);
+
+    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+    EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+    EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
+    EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
+    EasyMock.expect(context.getConnector()).andReturn(conn);
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+    EasyMock.expectLastCall().once();
+    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+    EasyMock.expect(fs.deleteRecursively(path)).andReturn(true).once();
+    marker.removeWalMarker(server2, id);
+    EasyMock.expectLastCall().once();
+    marker.forget(server2);
+    EasyMock.expectLastCall().once();
+    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
+    gc.collect(status);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
   }
 
   @Test
-  public void testAllRootLogsInZk() throws Exception {
-    // Mocks
+  public void ignoreReferenceLogOnDeadServer() throws Exception {
     AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
     VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    WalMarker marker = EasyMock.createMock(WalMarker.class);
     LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    Instance instance = EasyMock.createMock(Instance.class);
-    ZooReader zoo = EasyMock.createMock(ZooReader.class);
-
-    // Fake out some WAL references
-    final String instanceId = UUID.randomUUID().toString();
-    final List<String> currentLogs = Arrays.asList("2");
-    final List<String> walogs = Arrays.asList("1");
-    LogEntry currentLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 2, "host1:9997", "/accumulo/wals/host1+9997/2");
-    LogEntry prevLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 1, "host1:9997", "/accumulo/wals/host1+9997/1");
-
-    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
-    // Define the expectations
-    EasyMock.expect(instance.getInstanceID()).andReturn(instanceId).anyTimes();
-    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS)).andReturn(currentLogs);
-    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS)).andReturn(walogs);
-
-    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS + "/2", null)).andReturn(currentLogEntry.toBytes());
-    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS + "/1", null)).andReturn(prevLogEntry.toBytes());
-
-    EasyMock.replay(instance, zoo);
-
-    // Ensure that we see logs from both current_logs and walogs
-    Set<Path> rootWals = gcWals.getRootLogs(zoo, instance);
-
-    EasyMock.verify(instance, zoo);
+    Connector conn = EasyMock.createMock(Connector.class);
+    Scanner scanner = EasyMock.createMock(Scanner.class);
+
+    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+    EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+    EasyMock.expect(marker.getAllMarkers()).andReturn(markers2).once();
+    EasyMock.expect(marker.state(server2, id)).andReturn(new Pair<>(WalState.OPEN, path));
+    EasyMock.expect(context.getConnector()).andReturn(conn);
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+    EasyMock.expectLastCall().once();
+    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(scanner.iterator()).andReturn(emptyKV);
+    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer2List);
+    gc.collect(status);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
+  }
 
-    assertEquals(2, rootWals.size());
-    assertTrue("Expected to find WAL from walogs", rootWals.remove(new Path("/accumulo/wals/host1+9997/1")));
-    assertTrue("Expected to find WAL from current_logs", rootWals.remove(new Path("/accumulo/wals/host1+9997/2")));
+  @Test
+  public void replicationDelaysFileCollection() throws Exception {
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    WalMarker marker = EasyMock.createMock(WalMarker.class);
+    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
+    Connector conn = EasyMock.createMock(Connector.class);
+    Scanner scanner = EasyMock.createMock(Scanner.class);
+    String row = MetadataSchema.ReplicationSection.getRowPrefix() + path.toString();
+    String colf = MetadataSchema.ReplicationSection.COLF.toString();
+    String colq = "1";
+    Map<Key, Value> replicationWork = Collections.singletonMap(new Key(row, colf, colq), new Value(new byte[0]));
+
+
+    GCStatus status = new GCStatus(null, null, null, new GcCycleStats());
+
+    EasyMock.expect(tserverSet.getCurrentServers()).andReturn(Collections.singleton(server1));
+    EasyMock.expect(marker.getAllMarkers()).andReturn(markers).once();
+    EasyMock.expect(marker.state(server1, id)).andReturn(new Pair<WalState,Path>(WalState.UNREFERENCED, path));
+    EasyMock.expect(context.getConnector()).andReturn(conn);
+    EasyMock.expect(conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY)).andReturn(scanner);
+    scanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
+    EasyMock.expectLastCall().once();
+    scanner.setRange(MetadataSchema.ReplicationSection.getRange());
+    EasyMock.expectLastCall().once();
+    EasyMock.expect(scanner.iterator()).andReturn(replicationWork.entrySet().iterator());
+    EasyMock.replay(context, fs, marker, tserverSet, conn, scanner);
+    GarbageCollectWriteAheadLogs gc = new GarbageCollectWriteAheadLogs(context, fs, false, tserverSet, marker, tabletOnServer1List);
+    gc.collect(status);
+    EasyMock.verify(context, fs, marker, tserverSet, conn, scanner);
   }
 }