You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/06 22:31:52 UTC

[4/5] accumulo git commit: ACCUMULO-3772 GC must read both current_logs and walogs in ZK

ACCUMULO-3772 GC must read both current_logs and walogs in ZK

If the GC only reads walogs from ZK, it is possible that it will
delete WALs that are not also in the root or metadata table, preventing
the root table from performing recovery.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/2938cfad
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/2938cfad
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/2938cfad

Branch: refs/heads/1.7
Commit: 2938cfad91949b205e500bc73137e15e713c4262
Parents: 4749513
Author: Josh Elser <el...@apache.org>
Authored: Wed May 6 00:22:27 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 6 15:33:58 2015 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 146 +++++++++++-----
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 168 +++++++++++++++++++
 2 files changed, 276 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/2938cfad/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index a9a3f65..c880fef 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
@@ -54,6 +55,7 @@ import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -76,6 +78,7 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.net.HostAndPort;
@@ -84,6 +87,10 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
 
+  // The order of these is _very_ important. Must read from current_logs, then walogs because ZooTabletStateStore writes to
+  // walogs and then removes from current_logs
+  private static final String[] ZK_LOG_SUFFIXES = new String[] {RootTable.ZROOT_TABLET_CURRENT_LOGS, RootTable.ZROOT_TABLET_WALOGS};
+
   private final AccumuloServerContext context;
   private final VolumeManager fs;
   private final boolean useTrash;
@@ -113,6 +120,26 @@ public class GarbageCollectWriteAheadLogs {
     liveServers.startListeningForTabletServerChanges();
   }
 
+  /**
+   * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
+   *
+   * @param context
+   *          the collection server's context
+   * @param fs
+   *          volume manager to use
+   * @param useTrash
+   *          true to move files to trash rather than delete them
+   * @param liveTServerSet
+   *          a started LiveTServerSet instance
+   */
+  @VisibleForTesting
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet) throws IOException {
+    this.context = context;
+    this.fs = fs;
+    this.useTrash = useTrash;
+    this.liveServers = liveTServerSet;
+  }
+
   public void collect(GCStatus status) {
 
     Span span = Trace.start("getCandidates");
@@ -375,9 +402,6 @@ public class GarbageCollectWriteAheadLogs {
     return metaScanner;
   }
 
-
-
-
   /**
    * Scans log markers. The map passed in is populated with the logs for dead servers.
    *
@@ -386,15 +410,9 @@ public class GarbageCollectWriteAheadLogs {
    * @return total number of log files
    */
   private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
-    Set<Path> rootWALs = new HashSet<>();
-    // Get entries in zookeeper:
-    String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
-    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
-    List<String> children = zoo.getChildren(zpath);
-    for (String child : children) {
-      LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
-      rootWALs.add(new Path(entry.filename));
-    }
+    // Logs for the Root table are stored in ZooKeeper.
+    Set<Path> rootWALs = getRootLogs(ZooReaderWriter.getInstance(), context.getInstance());
+
     long count = 0;
 
     // get all the WAL markers that are not in zookeeper for dead servers
@@ -408,10 +426,13 @@ public class GarbageCollectWriteAheadLogs {
     Text filename = new Text();
     while (entries.hasNext()) {
       Entry<Key,Value> entry = entries.next();
+
       CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
       CurrentLogsSection.getPath(entry.getKey(), filename);
       TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
       Path path = new Path(filename.toString());
+
+      // A log is unused iff it's for a tserver which we don't know about or the log is marked as unused and it's not listed as used by the Root table
       if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
         Set<Path> logs = unusedLogs.get(tsi);
         if (logs == null) {
@@ -425,34 +446,83 @@ public class GarbageCollectWriteAheadLogs {
 
     // scan HDFS for logs for dead servers
     for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
-      RemoteIterator<LocatedFileStatus> iter =  volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
-      while (iter.hasNext()) {
-        LocatedFileStatus next = iter.next();
-        // recursive listing returns directories, too
-        if (next.isDirectory()) {
-          continue;
-        }
-        // make sure we've waited long enough for zookeeper propagation
-        if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
-          continue;
-        }
-        Path path = next.getPath();
-        String hostPlusPort = path.getParent().getName();
-        // server is still alive, or has a replacement
-        TServerInstance instance = liveServers.find(hostPlusPort);
-        if (instance != null) {
-          continue;
-        }
-        TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
-        Set<Path> paths = unusedLogs.get(fake);
-        if (paths == null) {
-          paths = new HashSet<>();
-        }
-        paths.add(path);
-        unusedLogs.put(fake, paths);
-      }
+      addUnusedWalsFromVolume(volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true), unusedLogs,
+          context.getConnector().getInstance().getZooKeepersSessionTimeOut());
     }
     return count;
   }
 
+  /**
+   * Fetch the WALs which are, or were, referenced by the Root Table
+   * @return The Set of WALs which are needed by the Root Table
+   */
+  Set<Path> getRootLogs(final ZooReader zoo, Instance instance) throws Exception {
+    final HashSet<Path> rootWALs = new HashSet<>();
+    final String zkRoot = ZooUtil.getRoot(instance);
+
+    // Get entries in zookeeper -- order in ZK_LOG_SUFFIXES is _very_ important
+    for (String pathSuffix : ZK_LOG_SUFFIXES) {
+      addLogsForNode(zoo, zkRoot + pathSuffix, rootWALs);
+    }
+
+    return rootWALs;
+  }
+
+  /**
+   * Read all WALs from the given path in ZooKeeper and add the paths to each WAL to the provided <code>rootWALs</code>
+   * @param reader A reader to ZooKeeper
+   * @param zpath The base path to read in ZooKeeper
+   * @param rootWALs A Set to collect the WALs in
+   */
+  void addLogsForNode(ZooReader reader, String zpath, HashSet<Path> rootWALs) throws Exception {
+    // Get entries in zookeeper:
+    List<String> children = reader.getChildren(zpath);
+    for (String child : children) {
+      LogEntry entry = LogEntry.fromBytes(reader.getData(zpath + "/" + child, null));
+      rootWALs.add(new Path(entry.filename));
+    }
+  }
+
+  /**
+   * Given a traversal over the <code>wals</code> directory on a {@link Volume}, add all unused WALs
+   *
+   * @param iter
+   *          Iterator over files in the <code>wals</code> directory
+   * @param unusedLogs
+   *          Map tracking unused WALs by server
+   */
+  void addUnusedWalsFromVolume(RemoteIterator<LocatedFileStatus> iter, Map<TServerInstance,Set<Path>> unusedLogs, int zkTimeout) throws Exception {
+    while (iter.hasNext()) {
+      LocatedFileStatus next = iter.next();
+      // recursive listing returns directories, too
+      if (next.isDirectory()) {
+        continue;
+      }
+      // make sure we've waited long enough for zookeeper propagation
+      //
+      // We aren't guaranteed to see the updates through the live tserver set just because the time since the file was
+      // last modified is longer than the ZK timeout.
+      // 1. If we think the server is alive, but it's actually dead, we'll grab it on a later cycle. Which is OK.
+      // 2. If we think the server is dead but it happened to be restarted it's possible to have a server which would differ only by session.
+      //    This is also OK because the new TServer will create a new WAL.
+      if (System.currentTimeMillis() - next.getModificationTime() < zkTimeout) {
+        continue;
+      }
+      Path path = next.getPath();
+      String hostPlusPort = path.getParent().getName();
+      // server is still alive, or has a replacement (same host+port, different session)
+      TServerInstance instance = liveServers.find(hostPlusPort);
+      if (instance != null) {
+        continue;
+      }
+      TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+      Set<Path> paths = unusedLogs.get(fake);
+      if (paths == null) {
+        paths = new HashSet<>();
+      }
+      paths.add(path);
+      unusedLogs.put(fake, paths);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/2938cfad/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
new file mode 100644
index 0000000..f431159
--- /dev/null
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.gc;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.data.impl.KeyExtent;
+import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
+import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.master.LiveTServerSet;
+import org.apache.accumulo.server.master.state.TServerInstance;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.io.Text;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+public class GarbageCollectWriteAheadLogsTest {
+
+  private Map<TServerInstance,Set<Path>> runTest(LiveTServerSet tserverSet, String tserver, long modificationTime) throws Exception {
+    // Mocks
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    final LocatedFileStatus fileStatus = EasyMock.createMock(LocatedFileStatus.class);
+
+    // Concrete objs
+    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
+
+    RemoteIterator<LocatedFileStatus> iter = new RemoteIterator<LocatedFileStatus>() {
+      boolean returnedOnce = false;
+
+      @Override
+      public boolean hasNext() throws IOException {
+        return !returnedOnce;
+      }
+
+      @Override
+      public LocatedFileStatus next() throws IOException {
+        returnedOnce = true;
+        return fileStatus;
+      }
+    };
+
+    Map<TServerInstance,Set<Path>> unusedLogs = new HashMap<>();
+
+    // Path is /accumulo/wals/host+port/UUID
+    Path walPath = new Path("/accumulo/wals/" + tserver + "/" + UUID.randomUUID().toString());
+    EasyMock.expect(fileStatus.getPath()).andReturn(walPath).anyTimes();
+
+    EasyMock.expect(fileStatus.getModificationTime()).andReturn(modificationTime);
+    EasyMock.expect(fileStatus.isDirectory()).andReturn(false);
+
+    EasyMock.replay(context, fs, fileStatus, tserverSet);
+
+    gcWals.addUnusedWalsFromVolume(iter, unusedLogs, 0);
+
+    EasyMock.verify(context, fs, fileStatus, tserverSet);
+
+    return unusedLogs;
+  }
+
+  @Test
+  public void testUnnoticedServerFailure() throws Exception {
+    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
+    TServerInstance tserverInstance = EasyMock.createMock(TServerInstance.class);
+    String tserver = "host1+9997";
+
+    // We find the TServer
+    EasyMock.expect(tserverSet.find(tserver)).andReturn(tserverInstance);
+
+    // But the modificationTime for the WAL was _way_ in the past.
+    long modificationTime = 0l;
+
+    // If the modification time for a WAL was significantly in the past (so far that the server _should_ have died
+    // by now) but the GC hasn't observed via ZK Watcher that the server died, we would not treat the
+    // WAL as unused.
+    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
+
+    // We think the server is still alive, therefore we don't call the WAL unused.
+    assertEquals(0, unusedLogs.size());
+  }
+
+  @Test
+  public void testUnnoticedServerRestart() throws Exception {
+    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
+    String tserver = "host1+9997";
+
+    // The server was _once_ alive, but we saw it died.
+    // Before the LiveTServerSet gets the Watcher that it's back online (with a new session)
+    // the GC runs
+    EasyMock.expect(tserverSet.find(tserver)).andReturn(null);
+
+    // Modification time for the WAL was _way_ in the past.
+    long modificationTime = 0l;
+
+    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
+
+    // If the same server comes back, it will use a new WAL, not the old one. The log should be unused
+    assertEquals(1, unusedLogs.size());
+  }
+
+  @Test
+  public void testAllRootLogsInZk() throws Exception {
+    // Mocks
+    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
+    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
+    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
+    Instance instance = EasyMock.createMock(Instance.class);
+    ZooReader zoo = EasyMock.createMock(ZooReader.class);
+
+    // Fake out some WAL references
+    final String instanceId = UUID.randomUUID().toString();
+    final List<String> currentLogs = Arrays.asList("2");
+    final List<String> walogs = Arrays.asList("1");
+    LogEntry currentLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 2, "host1:9997", "/accumulo/wals/host1+9997/2");
+    LogEntry prevLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 1, "host1:9997", "/accumulo/wals/host1+9997/1");
+
+    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
+
+    // Define the expectations
+    EasyMock.expect(instance.getInstanceID()).andReturn(instanceId).anyTimes();
+    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS)).andReturn(currentLogs);
+    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS)).andReturn(walogs);
+
+    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS + "/2", null)).andReturn(currentLogEntry.toBytes());
+    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS + "/1", null)).andReturn(prevLogEntry.toBytes());
+
+    EasyMock.replay(instance, zoo);
+
+    // Ensure that we see logs from both current_logs and walogs
+    Set<Path> rootWals = gcWals.getRootLogs(zoo, instance);
+
+    EasyMock.verify(instance, zoo);
+
+    assertEquals(2, rootWals.size());
+    assertTrue("Expected to find WAL from walogs", rootWals.remove(new Path("/accumulo/wals/host1+9997/1")));
+    assertTrue("Expected to find WAL from current_logs", rootWals.remove(new Path("/accumulo/wals/host1+9997/2")));
+  }
+}