You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/08/31 19:22:45 UTC

[2/9] accumulo git commit: ACCUMULO-4428 Fix state of GC firstSeenDead map

ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9edda321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9edda321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9edda321

Branch: refs/heads/1.8
Commit: 9edda3215974d7ee284ecfa08b69fcd1ef686d19
Parents: 661dac3
Author: Adam J. Shook <ad...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 14:30:33 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 32 +++++----
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b57b8fc..339b233 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -83,7 +83,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -96,11 +96,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 98acf9a..879d6b9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -530,6 +531,9 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
 
     ProbabilitySampler sampler = new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       Trace.on("gc", sampler);
 
@@ -574,7 +578,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index bc9fca3..940b922 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -107,6 +107,7 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Rule
   public TestName testName = new TestName();
@@ -151,7 +152,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     replay(instance, factory, siteConfig);
     AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -393,8 +395,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private List<Entry<Key,Value>> replData;
 
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, List<Entry<Key,Value>> replData)
+        throws IOException {
+      super(context, fs, useTrash, firstSeenDead);
       this.replData = replData;
     }
 
@@ -413,7 +416,7 @@ public class GarbageCollectWriteAheadLogsTest {
     LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
     replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 
-    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, firstSeenDead, replData);
 
     replay(conn);
 
@@ -442,7 +445,7 @@ public class GarbageCollectWriteAheadLogsTest {
     Instance inst = new MockInstance(testName.getMethodName());
     AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -485,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     Connector conn = context.getConnector();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -534,7 +537,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
     Entry<Key,Value> entry = Iterables.getOnlyElement(data);
@@ -562,7 +565,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
     Map<Key,Value> data = new HashMap<>();
@@ -666,8 +669,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock)
+        throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -701,7 +705,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
     AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
-    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -795,8 +799,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -846,7 +850,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 202bfac..3e8abe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,15 +31,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_START, "1");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
     cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 
@@ -107,6 +114,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -139,6 +150,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -284,6 +341,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return Iterators.size(Arrays.asList(fs.globStatus(path)).iterator());
   }
 
+  private List<String> getWALsForTableId(String tableId) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());