You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ec...@apache.org on 2015/04/25 01:21:03 UTC

[17/34] accumulo git commit: ACCUMULO-3423 more updates based on [~kturner]'s review

ACCUMULO-3423 more updates based on [~kturner]'s review


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/affff422
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/affff422
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/affff422

Branch: refs/heads/master
Commit: affff422ddd91cd96c8ca84dc824d931a1b72f70
Parents: c71fc12
Author: Eric C. Newton <er...@gmail.com>
Authored: Mon Mar 30 16:05:59 2015 -0400
Committer: Eric C. Newton <er...@gmail.com>
Committed: Mon Mar 30 16:05:59 2015 -0400

----------------------------------------------------------------------
 .../accumulo/server/util/MetadataTableUtil.java |  1 +
 .../gc/GarbageCollectWriteAheadLogs.java        | 34 ++++++++
 .../apache/accumulo/tserver/TabletServer.java   | 24 ++++--
 .../tserver/log/TabletServerLogger.java         | 40 ++++------
 .../accumulo/test/GarbageCollectWALIT.java      | 81 ++++++++++++++++++++
 .../accumulo/test/functional/WALSunnyDayIT.java | 10 +++
 6 files changed, 159 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index f5326bf..edc6189 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -1112,6 +1112,7 @@ public class MetadataTableUtil {
   }
 
   public static void markLogUnused(ClientContext context, ZooLock lock, TServerInstance tabletSession, Set<Path> all) throws AccumuloException {
+    // There could be a marker at the meta and/or root level, mark them both as unused
     try {
       BatchWriter root = null;
       BatchWriter meta = null;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index d523706..59612ab 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -50,9 +50,12 @@ import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.trace.Trace;
+import org.apache.accumulo.core.volume.Volume;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.AccumuloServerContext;
+import org.apache.accumulo.server.ServerConstants;
 import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.accumulo.server.fs.VolumeManagerImpl;
 import org.apache.accumulo.server.master.LiveTServerSet;
 import org.apache.accumulo.server.master.LiveTServerSet.Listener;
 import org.apache.accumulo.server.master.state.MetaDataStateStore;
@@ -63,7 +66,9 @@ import org.apache.accumulo.server.master.state.TabletState;
 import org.apache.accumulo.server.replication.StatusUtil;
 import org.apache.accumulo.server.replication.proto.Replication.Status;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.Text;
 import org.apache.htrace.Span;
 import org.apache.zookeeper.KeeperException;
@@ -406,6 +411,35 @@ public class GarbageCollectWriteAheadLogs {
       }
     }
 
+    // scan HDFS for logs for dead servers
+    for (Volume volume : VolumeManagerImpl.get().getVolumes()) {
+      RemoteIterator<LocatedFileStatus> iter =  volume.getFileSystem().listFiles(volume.prefixChild(ServerConstants.WAL_DIR), true);
+      while (iter.hasNext()) {
+        LocatedFileStatus next = iter.next();
+        // recursive listing returns directories, too
+        if (next.isDirectory()) {
+          continue;
+        }
+        // make sure we've waited long enough for zookeeper propagation
+        if (System.currentTimeMillis() - next.getModificationTime() < context.getConnector().getInstance().getZooKeepersSessionTimeOut()) {
+          continue;
+        }
+        Path path = next.getPath();
+        String hostPlusPort = path.getParent().getName();
+        // server is still alive, or has a replacement
+        TServerInstance instance = liveServers.find(hostPlusPort);
+        if (instance != null) {
+          continue;
+        }
+        TServerInstance fake = new TServerInstance(hostPlusPort, 0L);
+        Set<Path> paths = unusedLogs.get(fake);
+        if (paths == null) {
+          paths = new HashSet<>();
+        }
+        paths.add(path);
+        unusedLogs.put(fake, paths);
+      }
+    }
     return count;
   }
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 436fb12..e28d472 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -2988,7 +2988,14 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   }
 
   // avoid unnecessary redundant markings to meta
-  ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+  final ConcurrentHashMap<DfsLogger, EnumSet<TabletLevel>> metadataTableLogs = new ConcurrentHashMap<>();
+  final Object levelLocks[] = new Object[TabletLevel.values().length];
+  {
+    for (int i = 0; i < levelLocks.length; i++) {
+      levelLocks[i] = new Object();
+    }
+  }
+
 
   // remove any meta entries after a rolled log is no longer referenced
   Set<DfsLogger> closedLogs = new HashSet<>();
@@ -3018,14 +3025,19 @@ public class TabletServer extends AccumuloServerContext implements Runnable {
   public void addLoggersToMetadata(DfsLogger copy, KeyExtent extent) {
     TabletLevel level = TabletLevel.getLevel(extent);
     // serialize the updates to the metadata per level: avoids updating the level more than once
-    synchronized (level) {
-      EnumSet<TabletLevel> set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+    // updating one level, may cause updates to other levels, so we need to release the lock on metadataTableLogs
+    synchronized (levelLocks[level.ordinal()]) {
+      EnumSet<TabletLevel> set = null;
+      synchronized (metadataTableLogs) {
+        set = metadataTableLogs.putIfAbsent(copy, EnumSet.of(level));
+      }
       if (set == null || !set.contains(level) || level == TabletLevel.ROOT) {
         log.info("Writing log marker for level " + level + " " + copy.getFileName());
         MetadataTableUtil.addNewLogMarker(this, this.getLock(), this.getTabletSession(), copy.getPath(), extent);
-        if (set != null) {
-          set.add(level);
-        }
+      }
+      synchronized (metadataTableLogs) {
+        set = metadataTableLogs.get(copy);
+        set.add(level);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 0f3f642..04e7a83 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -77,7 +78,7 @@ public class TabletServerLogger {
 
   // The current logger
   private DfsLogger currentLog = null;
-  private final AtomicReference<DfsLogger> nextLog = new AtomicReference<>(null);
+  private final SynchronousQueue<DfsLogger> nextLog = new SynchronousQueue<>();
   private final ThreadPoolExecutor nextLogMaker = new SimpleThreadPool(1, "WALog creator");
 
   // The current generation of logs.
@@ -149,6 +150,7 @@ public class TabletServerLogger {
     this.maxSize = maxSize;
     this.syncCounter = syncCounter;
     this.flushCounter = flushCounter;
+    startLogMaker();
   }
 
   private DfsLogger initializeLoggers(final AtomicInteger logIdOut) throws IOException {
@@ -198,18 +200,9 @@ public class TabletServerLogger {
     }
 
     try {
-      DfsLogger next = nextLog.getAndSet(null);
-      if (next != null) {
-        log.info("Using next log " + next.getFileName());
-        currentLog = next;
-      } else {
-        DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
-        alog.open(tserver.getClientAddressString());
-        currentLog = alog;
-      }
-      if (nextLog.get() == null) {
-        createNextLog();
-      }
+      DfsLogger next = nextLog.take();
+      log.info("Using next log " + next.getFileName());
+      currentLog = next;
       logId.incrementAndGet();
       return;
     } catch (Exception t) {
@@ -221,12 +214,11 @@ public class TabletServerLogger {
     }
   }
 
-  // callers are synchronized already
-  private void createNextLog() {
-    if (nextLogMaker.getActiveCount() == 0) {
-      nextLogMaker.submit(new Runnable() {
-        @Override
-        public void run() {
+  private void startLogMaker() {
+    nextLogMaker.submit(new Runnable() {
+      @Override
+      public void run() {
+        while (!nextLogMaker.isShutdown()) {
           try {
             log.debug("Creating next WAL");
             DfsLogger alog = new DfsLogger(tserver.getServerConfig(), syncCounter, flushCounter);
@@ -235,17 +227,15 @@ public class TabletServerLogger {
               tserver.addLoggersToMetadata(alog, tablet.getExtent());
             }
             log.debug("Created next WAL " + alog.getFileName());
-            alog = nextLog.getAndSet(alog);
-            if (alog != null) {
-              log.debug("closing unused next log: " + alog.getFileName());
-              alog.close();
+            while (!nextLog.offer(alog, 12, TimeUnit.HOURS)) {
+              log.info("Our WAL was not used for 12 hours: " + alog.getFileName());
             }
           } catch (Exception t) {
             log.error("{}", t.getMessage(), t);
           }
         }
-      });
-    }
+      }
+    });
   }
 
   public void resetLoggers() throws IOException {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
new file mode 100644
index 0000000..0324e4a
--- /dev/null
+++ b/test/src/test/java/org/apache/accumulo/test/GarbageCollectWALIT.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.metadata.MetadataTable;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.fate.util.UtilWaitThread;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ConfigurableMacIT;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GarbageCollectWALIT extends ConfigurableMacIT {
+
+  @Override
+  protected void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_HOST, "5s");
+    cfg.setProperty(Property.GC_CYCLE_START, "1s");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+    cfg.setNumTservers(1);
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  @Test(timeout = 2 * 60 * 1000)
+  public void test() throws Exception {
+    // not yet, please
+    String tableName = getUniqueNames(1)[0];
+    cluster.getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
+    Connector c = getConnector();
+    c.tableOperations().create(tableName);
+    // count the number of WALs in the filesystem
+    assertEquals(2, countWALsInFS(cluster));
+    cluster.getClusterControl().stop(ServerType.TABLET_SERVER);
+    cluster.getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
+    cluster.getClusterControl().start(ServerType.TABLET_SERVER);
+    Iterators.size(c.createScanner(MetadataTable.NAME, Authorizations.EMPTY).iterator());
+    // let GC run
+    UtilWaitThread.sleep(3 * 5 * 1000);
+    assertEquals(2, countWALsInFS(cluster));
+  }
+
+  private int countWALsInFS(MiniAccumuloClusterImpl cluster) throws Exception {
+    FileSystem fs = cluster.getFileSystem();
+    RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(cluster.getConfig().getAccumuloDir() + "/wal"), true);
+    int result = 0;
+    while (iterator.hasNext()) {
+      LocatedFileStatus next = iterator.next();
+      if (!next.isDirectory()) {
+        result++;
+      }
+    }
+    return result;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/accumulo/blob/affff422/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index b8e36bc..3f51c8d 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Iterators;
@@ -122,6 +123,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
     for (String table: new String[] { tableName, MetadataTable.NAME, RootTable.NAME} ) {
       c.tableOperations().flush(table, null, null, true);
     }
+    UtilWaitThread.sleep(1000);
     // rolled WAL is no longer in use, but needs to be GC'd
     Map<String,Boolean> walsAfterflush = getWals(c, zoo);
     assertEquals(walsAfterflush.toString(), 3, walsAfterflush.size());
@@ -151,6 +153,7 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
 
     // put some data in the WAL
     assertEquals(0, cluster.exec(SetGoalState.class, "NORMAL").waitFor());
+    verifySomeData(c, tableName, 1000 * 50 + 1);
     writeSomeData(c, tableName, 100, 100);
 
     Map<String,Boolean> walsAfterRestart = getWals(c, zoo);
@@ -163,6 +166,13 @@ public class WALSunnyDayIT extends ConfigurableMacIT {
     assertEquals("logs in use should be 2", 2, countTrue(walsAfterRestartAndGC.values()));
   }
 
+  private void verifySomeData(Connector c, String tableName, int expected) throws Exception {
+    Scanner scan = c.createScanner(tableName, EMPTY);
+    int result = Iterators.size(scan.iterator());
+    scan.close();
+    Assert.assertEquals(expected, result);
+  }
+
   private void writeSomeData(Connector conn, String tableName, int row, int col) throws Exception {
     Random rand = new Random();
     BatchWriter bw = conn.createBatchWriter(tableName, null);