You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/05/10 23:05:51 UTC

[05/19] accumulo git commit: Revert "ACCUMULO-3772 GC must read both current_logs and walogs in ZK"

Revert "ACCUMULO-3772 GC must read both current_logs and walogs in ZK"

This reverts commit 2938cfad91949b205e500bc73137e15e713c4262.

Conflicts:
	server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4004071c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4004071c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4004071c

Branch: refs/heads/1.7
Commit: 4004071c0f3228a0903c618155e4944521f46837
Parents: e7571e2
Author: Josh Elser <el...@apache.org>
Authored: Sat May 9 14:42:04 2015 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Sat May 9 14:42:04 2015 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        | 153 +++++------------
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 168 -------------------
 2 files changed, 39 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/4004071c/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index 194d357..40acf8b 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -32,7 +32,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.Connector;
-import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.data.Key;
@@ -55,7 +54,6 @@ import org.apache.accumulo.core.trace.Span;
 import org.apache.accumulo.core.trace.Trace;
 import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
 import org.apache.accumulo.server.AccumuloServerContext;
 import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
@@ -78,7 +76,6 @@ import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Iterators;
 import com.google.common.net.HostAndPort;
@@ -87,10 +84,6 @@ import com.google.protobuf.InvalidProtocolBufferException;
 public class GarbageCollectWriteAheadLogs {
   private static final Logger log = LoggerFactory.getLogger(GarbageCollectWriteAheadLogs.class);
 
-  // The order of these is _very_ important. Must read from current_logs, then walogs because ZooTabletStateStore writes to
-  // walogs and then removes from current_logs
-  private static final String[] ZK_LOG_SUFFIXES = new String[] {RootTable.ZROOT_TABLET_CURRENT_LOGS, RootTable.ZROOT_TABLET_WALOGS};
-
   private final AccumuloServerContext context;
   private final VolumeManager fs;
   private final boolean useTrash;
@@ -120,26 +113,6 @@ public class GarbageCollectWriteAheadLogs {
     liveServers.startListeningForTabletServerChanges();
   }
 
-  /**
-   * Creates a new GC WAL object. Meant for testing -- allows mocked objects.
-   *
-   * @param context
-   *          the collection server's context
-   * @param fs
-   *          volume manager to use
-   * @param useTrash
-   *          true to move files to trash rather than delete them
-   * @param liveTServerSet
-   *          a started LiveTServerSet instance
-   */
-  @VisibleForTesting
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, LiveTServerSet liveTServerSet) throws IOException {
-    this.context = context;
-    this.fs = fs;
-    this.useTrash = useTrash;
-    this.liveServers = liveTServerSet;
-  }
-
   public void collect(GCStatus status) {
 
     Span span = Trace.start("getCandidates");
@@ -397,6 +370,9 @@ public class GarbageCollectWriteAheadLogs {
     return metaScanner;
   }
 
+
+
+
   /**
    * Scans log markers. The map passed in is populated with the logs for dead servers.
    *
@@ -404,10 +380,16 @@ public class GarbageCollectWriteAheadLogs {
    *          map of dead server to log file entries
    * @return total number of log files
    */
-  private long getCurrent(Map<TServerInstance,Set<Path>> unusedLogs, Set<TServerInstance> currentServers) throws Exception {
-    // Logs for the Root table are stored in ZooKeeper.
-    Set<Path> rootWALs = getRootLogs(ZooReaderWriter.getInstance(), context.getInstance());
-
+  private long getCurrent(Map<TServerInstance, Set<Path> > unusedLogs, Set<TServerInstance> currentServers) throws Exception {
+    Set<Path> rootWALs = new HashSet<>();
+    // Get entries in zookeeper:
+    String zpath = ZooUtil.getRoot(context.getInstance()) + RootTable.ZROOT_TABLET_WALOGS;
+    ZooReaderWriter zoo = ZooReaderWriter.getInstance();
+    List<String> children = zoo.getChildren(zpath);
+    for (String child : children) {
+      LogEntry entry = LogEntry.fromBytes(zoo.getData(zpath + "/" + child, null));
+      rootWALs.add(new Path(entry.filename));
+    }
     long count = 0;
 
     // get all the WAL markers that are not in zookeeper for dead servers
@@ -421,13 +403,10 @@ public class GarbageCollectWriteAheadLogs {
     Text filename = new Text();
     while (entries.hasNext()) {
       Entry<Key,Value> entry = entries.next();
-
       CurrentLogsSection.getTabletServer(entry.getKey(), hostAndPort, sessionId);
       CurrentLogsSection.getPath(entry.getKey(), filename);
       TServerInstance tsi = new TServerInstance(HostAndPort.fromString(hostAndPort.toString()), sessionId.toString());
       Path path = new Path(filename.toString());
-
-      // A log is unused iff it's for a tserver which we don't know about or the log is marked as unused and it's not listed as used by the Root table
       if (!currentServers.contains(tsi) || entry.getValue().equals(CurrentLogsSection.UNUSED) && !rootWALs.contains(path)) {
         Set<Path> logs = unusedLogs.get(tsi);
         if (logs == null) {
@@ -441,88 +420,34 @@ public class GarbageCollectWriteAheadLogs {
 
     // scan HDFS for logs for dead servers
     for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
-      addUnusedWalsFromVolume(volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true), unusedLogs, context.getConnector()
-          .getInstance().getZooKeepersSessionTimeOut());
-    }
-    return count;
-  }
-
-  /**
-   * Fetch the WALs which are, or were, referenced by the Root Table
-   *
-   * @return The Set of WALs which are needed by the Root Table
-   */
-  Set<Path> getRootLogs(final ZooReader zoo, Instance instance) throws Exception {
-    final HashSet<Path> rootWALs = new HashSet<>();
-    final String zkRoot = ZooUtil.getRoot(instance);
-
-    // Get entries in zookeeper -- order in ZK_LOG_SUFFIXES is _very_ important
-    for (String pathSuffix : ZK_LOG_SUFFIXES) {
-      addLogsForNode(zoo, zkRoot + pathSuffix, rootWALs);
-    }
-
-    return rootWALs;
-  }
-
-  /**
-   * Read all WALs from the given path in ZooKeeper and add the paths to each WAL to the provided <code>rootWALs</code>
-   *
-   * @param reader
-   *          A reader to ZooKeeper
-   * @param zpath
-   *          The base path to read in ZooKeeper
-   * @param rootWALs
-   *          A Set to collect the WALs in
-   */
-  void addLogsForNode(ZooReader reader, String zpath, HashSet<Path> rootWALs) throws Exception {
-    // Get entries in zookeeper:
-    List<String> children = reader.getChildren(zpath);
-    for (String child : children) {
-      LogEntry entry = LogEntry.fromBytes(reader.getData(zpath + "/" + child, null));
-      rootWALs.add(new Path(entry.filename));
-    }
-  }
-
-  /**
-   * Given a traversal over the <code>wals</code> directory on a {@link Volume}, add all unused WALs
-   *
-   * @param iter
-   *          Iterator over files in the <code>wals</code> directory
-   * @param unusedLogs
-   *          Map tracking unused WALs by server
-   */
-  void addUnusedWalsFromVolume(RemoteIterator<LocatedFileStatus> iter, Map<TServerInstance,Set<Path>> unusedLogs, int zkTimeout) throws Exception {
-    while (iter.hasNext()) {
-      LocatedFileStatus next = iter.next();
-      // recursive listing returns directories, too
-      if (next.isDirectory()) {
-        continue;
-      }
-      // make sure we've waited long enough for zookeeper propagation
-      //
-      // We aren't guaranteed to see the updates through the live tserver set just because the time since the file was
-      // last modified is longer than the ZK timeout.
-      // 1. If we think the server is alive, but it's actually dead, we'll grab it on a later cycle. Which is OK.
-      // 2. If we think the server is dead but it happened to be restarted it's possible to have a server which would differ only by session.
-      // This is also OK because the new TServer will create a new WAL.
-      if (System.currentTimeMillis() - next.getModificationTime() < zkTimeout) {
-        continue;
-      }
-      Path path = next.getPath();
-      String hostPlusPort = path.getParent().getName();
-      // server is still alive, or has a replacement (same host+port, different session)
-      TServerInstance instance = liveServers.find(hostPlusPort);
-      if (instance != null) {
-        continue;
-      }
-      TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
-      Set<Path> paths = unusedLogs.get(fake);
-      if (paths == null) {
-        paths = new HashSet<>();
+      RemoteIterator<LocatedFileStatus> iter =  volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+      while (iter.hasNext()) {
+        LocatedFileStatus next = iter.next();
+        // recursive listing returns directories, too
+        if (next.isDirectory()) {
+          continue;
+        }
+        // make sure we've waited long enough for zookeeper propagation
+        if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+          continue;
+        }
+        Path path = next.getPath();
+        String hostPlusPort = path.getParent().getName();
+        // server is still alive, or has a replacement
+        TServerInstance instance = liveServers.find(hostPlusPort);
+        if (instance != null) {
+          continue;
+        }
+        TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+        Set<Path> paths = unusedLogs.get(fake);
+        if (paths == null) {
+          paths = new HashSet<>();
+        }
+        paths.add(path);
+        unusedLogs.put(fake, paths);
       }
-      paths.add(path);
-      unusedLogs.put(fake, paths);
     }
+    return count;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/4004071c/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
deleted file mode 100644
index f431159..0000000
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.gc;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.data.impl.KeyExtent;
-import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.tabletserver.log.LogEntry;
-import org.apache.accumulo.fate.zookeeper.ZooReader;
-import org.apache.accumulo.server.AccumuloServerContext;
-import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.master.LiveTServerSet;
-import org.apache.accumulo.server.master.state.TServerInstance;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
-import org.apache.hadoop.io.Text;
-import org.easymock.EasyMock;
-import org.junit.Test;
-
-public class GarbageCollectWriteAheadLogsTest {
-
-  private Map<TServerInstance,Set<Path>> runTest(LiveTServerSet tserverSet, String tserver, long modificationTime) throws Exception {
-    // Mocks
-    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
-    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    final LocatedFileStatus fileStatus = EasyMock.createMock(LocatedFileStatus.class);
-
-    // Concrete objs
-    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
-    RemoteIterator<LocatedFileStatus> iter = new RemoteIterator<LocatedFileStatus>() {
-      boolean returnedOnce = false;
-
-      @Override
-      public boolean hasNext() throws IOException {
-        return !returnedOnce;
-      }
-
-      @Override
-      public LocatedFileStatus next() throws IOException {
-        returnedOnce = true;
-        return fileStatus;
-      }
-    };
-
-    Map<TServerInstance,Set<Path>> unusedLogs = new HashMap<>();
-
-    // Path is /accumulo/wals/host+port/UUID
-    Path walPath = new Path("/accumulo/wals/" + tserver + "/" + UUID.randomUUID().toString());
-    EasyMock.expect(fileStatus.getPath()).andReturn(walPath).anyTimes();
-
-    EasyMock.expect(fileStatus.getModificationTime()).andReturn(modificationTime);
-    EasyMock.expect(fileStatus.isDirectory()).andReturn(false);
-
-    EasyMock.replay(context, fs, fileStatus, tserverSet);
-
-    gcWals.addUnusedWalsFromVolume(iter, unusedLogs, 0);
-
-    EasyMock.verify(context, fs, fileStatus, tserverSet);
-
-    return unusedLogs;
-  }
-
-  @Test
-  public void testUnnoticedServerFailure() throws Exception {
-    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    TServerInstance tserverInstance = EasyMock.createMock(TServerInstance.class);
-    String tserver = "host1+9997";
-
-    // We find the TServer
-    EasyMock.expect(tserverSet.find(tserver)).andReturn(tserverInstance);
-
-    // But the modificationTime for the WAL was _way_ in the past.
-    long modificationTime = 0l;
-
-    // If the modification time for a WAL was significantly in the past (so far that the server _should_ have died
-    // by now) but the GC hasn't observed via ZK Watcher that the server died, we would not treat the
-    // WAL as unused.
-    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
-    // We think the server is still alive, therefore we don't call the WAL unused.
-    assertEquals(0, unusedLogs.size());
-  }
-
-  @Test
-  public void testUnnoticedServerRestart() throws Exception {
-    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    String tserver = "host1+9997";
-
-    // The server was _once_ alive, but we saw it died.
-    // Before the LiveTServerSet gets the Watcher that it's back online (with a new session)
-    // the GC runs
-    EasyMock.expect(tserverSet.find(tserver)).andReturn(null);
-
-    // Modification time for the WAL was _way_ in the past.
-    long modificationTime = 0l;
-
-    Map<TServerInstance,Set<Path>> unusedLogs = runTest(tserverSet, tserver, modificationTime);
-
-    // If the same server comes back, it will use a new WAL, not the old one. The log should be unused
-    assertEquals(1, unusedLogs.size());
-  }
-
-  @Test
-  public void testAllRootLogsInZk() throws Exception {
-    // Mocks
-    AccumuloServerContext context = EasyMock.createMock(AccumuloServerContext.class);
-    VolumeManager fs = EasyMock.createMock(VolumeManager.class);
-    LiveTServerSet tserverSet = EasyMock.createMock(LiveTServerSet.class);
-    Instance instance = EasyMock.createMock(Instance.class);
-    ZooReader zoo = EasyMock.createMock(ZooReader.class);
-
-    // Fake out some WAL references
-    final String instanceId = UUID.randomUUID().toString();
-    final List<String> currentLogs = Arrays.asList("2");
-    final List<String> walogs = Arrays.asList("1");
-    LogEntry currentLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 2, "host1:9997", "/accumulo/wals/host1+9997/2");
-    LogEntry prevLogEntry = new LogEntry(new KeyExtent(new Text("+r"), null, null), 1, "host1:9997", "/accumulo/wals/host1+9997/1");
-
-    GarbageCollectWriteAheadLogs gcWals = new GarbageCollectWriteAheadLogs(context, fs, true, tserverSet);
-
-    // Define the expectations
-    EasyMock.expect(instance.getInstanceID()).andReturn(instanceId).anyTimes();
-    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS)).andReturn(currentLogs);
-    EasyMock.expect(zoo.getChildren(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS)).andReturn(walogs);
-
-    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_CURRENT_LOGS + "/2", null)).andReturn(currentLogEntry.toBytes());
-    EasyMock.expect(zoo.getData(Constants.ZROOT + "/" + instanceId + RootTable.ZROOT_TABLET_WALOGS + "/1", null)).andReturn(prevLogEntry.toBytes());
-
-    EasyMock.replay(instance, zoo);
-
-    // Ensure that we see logs from both current_logs and walogs
-    Set<Path> rootWals = gcWals.getRootLogs(zoo, instance);
-
-    EasyMock.verify(instance, zoo);
-
-    assertEquals(2, rootWals.size());
-    assertTrue("Expected to find WAL from walogs", rootWals.remove(new Path("/accumulo/wals/host1+9997/1")));
-    assertTrue("Expected to find WAL from current_logs", rootWals.remove(new Path("/accumulo/wals/host1+9997/2")));
-  }
-}