You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2016/08/31 19:22:44 UTC

[1/9] accumulo git commit: ACCUMULO-4428 Fix state of GC firstSeenDead map

Repository: accumulo
Updated Branches:
  refs/heads/1.7 661dac336 -> 8a3cc4f04
  refs/heads/1.8 d28a3ee3e -> 55795c1ca
  refs/heads/master 159560979 -> cca8b896e


ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9edda321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9edda321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9edda321

Branch: refs/heads/1.7
Commit: 9edda3215974d7ee284ecfa08b69fcd1ef686d19
Parents: 661dac3
Author: Adam J. Shook <ad...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 14:30:33 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 32 +++++----
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b57b8fc..339b233 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -83,7 +83,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -96,11 +96,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 98acf9a..879d6b9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -530,6 +531,9 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
 
     ProbabilitySampler sampler = new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       Trace.on("gc", sampler);
 
@@ -574,7 +578,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index bc9fca3..940b922 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -107,6 +107,7 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Rule
   public TestName testName = new TestName();
@@ -151,7 +152,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     replay(instance, factory, siteConfig);
     AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -393,8 +395,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private List<Entry<Key,Value>> replData;
 
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, List<Entry<Key,Value>> replData)
+        throws IOException {
+      super(context, fs, useTrash, firstSeenDead);
       this.replData = replData;
     }
 
@@ -413,7 +416,7 @@ public class GarbageCollectWriteAheadLogsTest {
     LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
     replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 
-    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, firstSeenDead, replData);
 
     replay(conn);
 
@@ -442,7 +445,7 @@ public class GarbageCollectWriteAheadLogsTest {
     Instance inst = new MockInstance(testName.getMethodName());
     AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -485,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     Connector conn = context.getConnector();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -534,7 +537,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
     Entry<Key,Value> entry = Iterables.getOnlyElement(data);
@@ -562,7 +565,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
     Map<Key,Value> data = new HashMap<>();
@@ -666,8 +669,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock)
+        throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -701,7 +705,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
     AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
-    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -795,8 +799,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -846,7 +850,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 202bfac..3e8abe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,15 +31,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_START, "1");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
     cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 
@@ -107,6 +114,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -139,6 +150,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -284,6 +341,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return Iterators.size(Arrays.asList(fs.globStatus(path)).iterator());
   }
 
+  private List<String> getWALsForTableId(String tableId) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());


[3/9] accumulo git commit: ACCUMULO-4428 Fix state of GC firstSeenDead map

Posted by el...@apache.org.
ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9edda321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9edda321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9edda321

Branch: refs/heads/master
Commit: 9edda3215974d7ee284ecfa08b69fcd1ef686d19
Parents: 661dac3
Author: Adam J. Shook <ad...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 14:30:33 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 32 +++++----
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b57b8fc..339b233 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -83,7 +83,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -96,11 +96,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 98acf9a..879d6b9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -530,6 +531,9 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
 
     ProbabilitySampler sampler = new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       Trace.on("gc", sampler);
 
@@ -574,7 +578,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index bc9fca3..940b922 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -107,6 +107,7 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Rule
   public TestName testName = new TestName();
@@ -151,7 +152,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     replay(instance, factory, siteConfig);
     AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -393,8 +395,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private List<Entry<Key,Value>> replData;
 
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, List<Entry<Key,Value>> replData)
+        throws IOException {
+      super(context, fs, useTrash, firstSeenDead);
       this.replData = replData;
     }
 
@@ -413,7 +416,7 @@ public class GarbageCollectWriteAheadLogsTest {
     LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
     replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 
-    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, firstSeenDead, replData);
 
     replay(conn);
 
@@ -442,7 +445,7 @@ public class GarbageCollectWriteAheadLogsTest {
     Instance inst = new MockInstance(testName.getMethodName());
     AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -485,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     Connector conn = context.getConnector();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -534,7 +537,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
     Entry<Key,Value> entry = Iterables.getOnlyElement(data);
@@ -562,7 +565,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
     Map<Key,Value> data = new HashMap<>();
@@ -666,8 +669,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock)
+        throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -701,7 +705,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
     AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
-    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -795,8 +799,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -846,7 +850,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 202bfac..3e8abe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,15 +31,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_START, "1");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
     cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 
@@ -107,6 +114,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -139,6 +150,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -284,6 +341,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return Iterators.size(Arrays.asList(fs.globStatus(path)).iterator());
   }
 
+  private List<String> getWALsForTableId(String tableId) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());


[5/9] accumulo git commit: ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Posted by el...@apache.org.
ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Use the clusterControl on MAC to alter the state. Make
the verification based on the filesystem instead of log
messages from the GC.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8a3cc4f0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8a3cc4f0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8a3cc4f0

Branch: refs/heads/1.8
Commit: 8a3cc4f04c61847a7dcc9abd24a83c072a35b00f
Parents: 9edda32
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:13:23 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:13:23 2016 -0400

----------------------------------------------------------------------
 .../test/functional/GarbageCollectorIT.java     | 36 ++++++++------------
 1 file changed, 14 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a3cc4f0/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 3e8abe6..ee790be 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -99,9 +99,9 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException {
+  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException, IOException {
     // kill gc started by MAC
-    getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
     // delete lock in zookeeper if there, this will allow next GC to start quickly
     String path = ZooUtil.getRoot(new ZooKeeperInstance(getCluster().getClientConfig())) + Constants.ZGC_LOCK;
     ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
@@ -114,10 +114,6 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
-  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
-      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
-  }
-
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -170,29 +166,25 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     // Test WAL log has been created
     List<String> walsBefore = getWALsForTableId(tableId);
     Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
-
-    // Flush and check for no WAL logs
-    c.tableOperations().flush("test_ingest", null, null, true);
-    List<String> walsAfter = getWALsForTableId(tableId);
-    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+    final File walToBeDeleted = new File(walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", ""));
 
     // Validate WAL file still exists
-    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
-    File wf = new File(walFile);
-    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+    Assert.assertEquals("WAL file does not exist", true, walToBeDeleted.exists());
 
-    // Kill TServer and give it some time to die and master to rebalance
-    killMacTServer();
+    // Kill TServers and give it some time to die
+    getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
     UtilWaitThread.sleep(5000);
+    // Restart them or the GC won't ever be able to run a cycle
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
 
     // Restart GC and let it run
-    Process gc = getCluster().exec(SimpleGarbageCollector.class);
-    UtilWaitThread.sleep(60000);
+    getCluster().getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
 
-    // Then check the log for proper events
-    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
-    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
-    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+    log.info("Waiting for garbage collector to delete the WAL {}", walToBeDeleted);
+    while (walToBeDeleted.exists()) {
+      // Wait for the file to be deleted
+      Thread.sleep(2000);
+    }
   }
 
   @Test


[8/9] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/55795c1c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/55795c1c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/55795c1c

Branch: refs/heads/master
Commit: 55795c1caeaf78419ac5177b8870ea5b1631f4ac
Parents: d28a3ee 8a3cc4f
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:22:20 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:22:20 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[7/9] accumulo git commit: Merge branch '1.7' into 1.8

Posted by el...@apache.org.
Merge branch '1.7' into 1.8


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/55795c1c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/55795c1c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/55795c1c

Branch: refs/heads/1.8
Commit: 55795c1caeaf78419ac5177b8870ea5b1631f4ac
Parents: d28a3ee 8a3cc4f
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:22:20 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:22:20 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[9/9] accumulo git commit: Merge branch '1.8'

Posted by el...@apache.org.
Merge branch '1.8'


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/cca8b896
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/cca8b896
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/cca8b896

Branch: refs/heads/master
Commit: cca8b896e38ec2d085100d915a4f9a94b73b086f
Parents: 1595609 55795c1
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:22:30 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:22:30 2016 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[4/9] accumulo git commit: ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Posted by el...@apache.org.
ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Use the clusterControl on MAC to alter the state. Make
the verification based on the filesystem instead of log
messages from the GC.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8a3cc4f0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8a3cc4f0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8a3cc4f0

Branch: refs/heads/1.7
Commit: 8a3cc4f04c61847a7dcc9abd24a83c072a35b00f
Parents: 9edda32
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:13:23 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:13:23 2016 -0400

----------------------------------------------------------------------
 .../test/functional/GarbageCollectorIT.java     | 36 ++++++++------------
 1 file changed, 14 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a3cc4f0/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 3e8abe6..ee790be 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -99,9 +99,9 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException {
+  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException, IOException {
     // kill gc started by MAC
-    getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
     // delete lock in zookeeper if there, this will allow next GC to start quickly
     String path = ZooUtil.getRoot(new ZooKeeperInstance(getCluster().getClientConfig())) + Constants.ZGC_LOCK;
     ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
@@ -114,10 +114,6 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
-  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
-      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
-  }
-
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -170,29 +166,25 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     // Test WAL log has been created
     List<String> walsBefore = getWALsForTableId(tableId);
     Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
-
-    // Flush and check for no WAL logs
-    c.tableOperations().flush("test_ingest", null, null, true);
-    List<String> walsAfter = getWALsForTableId(tableId);
-    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+    final File walToBeDeleted = new File(walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", ""));
 
     // Validate WAL file still exists
-    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
-    File wf = new File(walFile);
-    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+    Assert.assertEquals("WAL file does not exist", true, walToBeDeleted.exists());
 
-    // Kill TServer and give it some time to die and master to rebalance
-    killMacTServer();
+    // Kill TServers and give it some time to die
+    getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
     UtilWaitThread.sleep(5000);
+    // Restart them or the GC won't ever be able to run a cycle
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
 
     // Restart GC and let it run
-    Process gc = getCluster().exec(SimpleGarbageCollector.class);
-    UtilWaitThread.sleep(60000);
+    getCluster().getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
 
-    // Then check the log for proper events
-    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
-    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
-    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+    log.info("Waiting for garbage collector to delete the WAL {}", walToBeDeleted);
+    while (walToBeDeleted.exists()) {
+      // Wait for the file to be deleted
+      Thread.sleep(2000);
+    }
   }
 
   @Test


[2/9] accumulo git commit: ACCUMULO-4428 Fix state of GC firstSeenDead map

Posted by el...@apache.org.
ACCUMULO-4428 Fix state of GC firstSeenDead map

The GC's map of host->timestamp is used to track when it has first seen
a dead tserver, however a new instance of GarbageCollectWriteAheadLogs
is used during each cycle of the GC.  The state information is lost.
The state is now managed by SimpleGarbageCollector, passing this state
into each cycle of the GCWriteAheadLogs class.

Closes apache/accumulo#143

Signed-off-by: Josh Elser <el...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9edda321
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9edda321
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9edda321

Branch: refs/heads/1.8
Commit: 9edda3215974d7ee284ecfa08b69fcd1ef686d19
Parents: 661dac3
Author: Adam J. Shook <ad...@gmail.com>
Authored: Wed Aug 31 13:45:59 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 14:30:33 2016 -0400

----------------------------------------------------------------------
 .../gc/GarbageCollectWriteAheadLogs.java        |  7 +-
 .../accumulo/gc/SimpleGarbageCollector.java     |  6 +-
 .../gc/GarbageCollectWriteAheadLogsTest.java    | 32 +++++----
 .../test/functional/GarbageCollectorIT.java     | 68 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
index b57b8fc..339b233 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogs.java
@@ -83,7 +83,7 @@ public class GarbageCollectWriteAheadLogs {
 
   private final AccumuloServerContext context;
   private final VolumeManager fs;
-  private final Map<HostAndPort,Long> firstSeenDead = new HashMap<HostAndPort,Long>();
+  private final Map<HostAndPort,Long> firstSeenDead;
 
   private boolean useTrash;
 
@@ -96,11 +96,14 @@ public class GarbageCollectWriteAheadLogs {
    *          volume manager to use
    * @param useTrash
    *          true to move files to trash rather than delete them
+   * @param firstSeenDead
+   *          mutable map of a host to when it was first seen dead
    */
-  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash) throws IOException {
+  GarbageCollectWriteAheadLogs(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
     this.context = context;
     this.fs = fs;
     this.useTrash = useTrash;
+    this.firstSeenDead = firstSeenDead;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
----------------------------------------------------------------------
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 98acf9a..879d6b9 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -19,6 +19,7 @@ package org.apache.accumulo.gc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -530,6 +531,9 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
 
     ProbabilitySampler sampler = new ProbabilitySampler(getConfiguration().getFraction(Property.GC_TRACE_PERCENT));
 
+    // Map of tserver -> timestamp, used by the GCWriteAheadLogs to track state of when a tablet server went down
+    final Map<HostAndPort,Long> firstSeenDead = new HashMap<>();
+
     while (true) {
       Trace.on("gc", sampler);
 
@@ -574,7 +578,7 @@ public class SimpleGarbageCollector extends AccumuloServerContext implements Ifa
       // Clean up any unused write-ahead logs
       Span waLogs = Trace.start("walogs");
       try {
-        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash());
+        GarbageCollectWriteAheadLogs walogCollector = new GarbageCollectWriteAheadLogs(this, fs, isUsingTrash(), firstSeenDead);
         log.info("Beginning garbage collection of write-ahead logs");
         walogCollector.collect(status);
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
----------------------------------------------------------------------
diff --git a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
index bc9fca3..940b922 100644
--- a/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
+++ b/server/gc/src/test/java/org/apache/accumulo/gc/GarbageCollectWriteAheadLogsTest.java
@@ -107,6 +107,7 @@ public class GarbageCollectWriteAheadLogsTest {
   private VolumeManager volMgr;
   private GarbageCollectWriteAheadLogs gcwal;
   private long modTime;
+  private Map<HostAndPort,Long> firstSeenDead;
 
   @Rule
   public TestName testName = new TestName();
@@ -151,7 +152,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
     replay(instance, factory, siteConfig);
     AccumuloServerContext context = new AccumuloServerContext(factory);
-    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    firstSeenDead = new HashMap<>();
+    gcwal = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
     modTime = System.currentTimeMillis();
   }
 
@@ -393,8 +395,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private List<Entry<Key,Value>> replData;
 
-    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, List<Entry<Key,Value>> replData) throws IOException {
-      super(context, fs, useTrash);
+    ReplicationGCWAL(AccumuloServerContext context, VolumeManager fs, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, List<Entry<Key,Value>> replData)
+        throws IOException {
+      super(context, fs, useTrash, firstSeenDead);
       this.replData = replData;
     }
 
@@ -413,7 +416,7 @@ public class GarbageCollectWriteAheadLogsTest {
     LinkedList<Entry<Key,Value>> replData = new LinkedList<>();
     replData.add(Maps.immutableEntry(new Key("/wals/" + file1, StatusSection.NAME.toString(), "1"), StatusUtil.fileCreatedValue(System.currentTimeMillis())));
 
-    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, replData);
+    ReplicationGCWAL replGC = new ReplicationGCWAL(null, volMgr, false, firstSeenDead, replData);
 
     replay(conn);
 
@@ -442,7 +445,7 @@ public class GarbageCollectWriteAheadLogsTest {
     Instance inst = new MockInstance(testName.getMethodName());
     AccumuloServerContext context = new AccumuloServerContext(new ServerConfigurationFactory(inst));
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -485,7 +488,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     Connector conn = context.getConnector();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     long file1CreateTime = System.currentTimeMillis();
     long file2CreateTime = file1CreateTime + 50;
@@ -534,7 +537,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> data = gcWALs.getReplicationStatusForFile(conn, wal);
     Entry<Key,Value> entry = Iterables.getOnlyElement(data);
@@ -562,7 +565,7 @@ public class GarbageCollectWriteAheadLogsTest {
     bw.addMutation(m);
     bw.close();
 
-    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false);
+    GarbageCollectWriteAheadLogs gcWALs = new GarbageCollectWriteAheadLogs(context, volMgr, false, firstSeenDead);
 
     Iterable<Entry<Key,Value>> iter = gcWALs.getReplicationStatusForFile(conn, wal);
     Map<Key,Value> data = new HashMap<>();
@@ -666,8 +669,9 @@ public class GarbageCollectWriteAheadLogsTest {
 
     private boolean holdsLockBool = false;
 
-    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, boolean holdLock) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALPartialMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead, boolean holdLock)
+        throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
       this.holdsLockBool = holdLock;
     }
 
@@ -701,7 +705,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
   private GCWALPartialMock getGCWALForRemoveFileTest(GCStatus s, final boolean locked) throws IOException {
     AccumuloServerContext ctx = new AccumuloServerContext(new ServerConfigurationFactory(new MockInstance("accumulo")));
-    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, locked);
+    return new GCWALPartialMock(ctx, VolumeManagerImpl.get(), false, firstSeenDead, locked);
   }
 
   private Map<String,Path> getEmptyMap() {
@@ -795,8 +799,8 @@ public class GarbageCollectWriteAheadLogsTest {
 
   class GCWALDeadTserverCollectMock extends GarbageCollectWriteAheadLogs {
 
-    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash) throws IOException {
-      super(ctx, vm, useTrash);
+    public GCWALDeadTserverCollectMock(AccumuloServerContext ctx, VolumeManager vm, boolean useTrash, Map<HostAndPort,Long> firstSeenDead) throws IOException {
+      super(ctx, vm, useTrash, firstSeenDead);
     }
 
     @Override
@@ -846,7 +850,7 @@ public class GarbageCollectWriteAheadLogsTest {
 
     try {
       VolumeManager vm = VolumeManagerImpl.getLocal(walDir.toString());
-      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false);
+      GarbageCollectWriteAheadLogs gcwal2 = new GCWALDeadTserverCollectMock(ctx, vm, false, firstSeenDead);
       GCStatus status = new GCStatus(new GcCycleStats(), new GcCycleStats(), new GcCycleStats(), new GcCycleStats());
 
       gcwal2.collect(status);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/9edda321/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 202bfac..3e8abe6 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -19,7 +19,9 @@ package org.apache.accumulo.test.functional;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
@@ -29,15 +31,19 @@ import java.util.Map.Entry;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.cli.BatchWriterOpts;
 import org.apache.accumulo.core.cli.ScannerOpts;
+import org.apache.accumulo.core.client.AccumuloException;
+import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
 import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.ZooKeeperInstance;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema;
@@ -85,6 +91,7 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     cfg.setProperty(Property.GC_CYCLE_START, "1");
     cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
     cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.GC_WAL_DEAD_SERVER_WAIT, "1s");
     cfg.setProperty(Property.TSERV_MAXMEM, "5K");
     cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
 
@@ -107,6 +114,10 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
+  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
+      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
+  }
+
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -139,6 +150,52 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
   }
 
   @Test
+  public void gcDeleteDeadTServerWAL() throws Exception {
+    // Kill GC process
+    killMacGc();
+
+    // Create table and ingest data
+    Connector c = getConnector();
+    c.tableOperations().create("test_ingest");
+    c.tableOperations().setProperty("test_ingest", Property.TABLE_SPLIT_THRESHOLD.getKey(), "5K");
+    String tableId = getConnector().tableOperations().tableIdMap().get("test_ingest");
+    TestIngest.Opts opts = new TestIngest.Opts();
+    VerifyIngest.Opts vopts = new VerifyIngest.Opts();
+    vopts.rows = opts.rows = 10000;
+    vopts.cols = opts.cols = 1;
+    opts.setPrincipal("root");
+    vopts.setPrincipal("root");
+    TestIngest.ingest(c, opts, new BatchWriterOpts());
+
+    // Test WAL log has been created
+    List<String> walsBefore = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
+
+    // Flush and check for no WAL logs
+    c.tableOperations().flush("test_ingest", null, null, true);
+    List<String> walsAfter = getWALsForTableId(tableId);
+    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+
+    // Validate WAL file still exists
+    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
+    File wf = new File(walFile);
+    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+
+    // Kill TServer and give it some time to die and master to rebalance
+    killMacTServer();
+    UtilWaitThread.sleep(5000);
+
+    // Restart GC and let it run
+    Process gc = getCluster().exec(SimpleGarbageCollector.class);
+    UtilWaitThread.sleep(60000);
+
+    // Then check the log for proper events
+    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
+    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
+    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+  }
+
+  @Test
   public void gcLotsOfCandidatesIT() throws Exception {
     killMacGc();
 
@@ -284,6 +341,17 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     return Iterators.size(Arrays.asList(fs.globStatus(path)).iterator());
   }
 
+  private List<String> getWALsForTableId(String tableId) throws TableNotFoundException, AccumuloException, AccumuloSecurityException {
+    Scanner scanner = getConnector().createScanner("accumulo.metadata", Authorizations.EMPTY);
+    scanner.setRange(Range.prefix(new Text(tableId)));
+    scanner.fetchColumnFamily(new Text("log"));
+    List<String> walsList = new ArrayList<String>();
+    for (Entry<Key,Value> e : scanner) {
+      walsList.add(e.getValue().toString());
+    }
+    return walsList;
+  }
+
   public static void addEntries(Connector conn, BatchWriterOpts bwOpts) throws Exception {
     conn.securityOperations().grantTablePermission(conn.whoami(), MetadataTable.NAME, TablePermission.WRITE);
     BatchWriter bw = conn.createBatchWriter(MetadataTable.NAME, bwOpts.getBatchWriterConfig());


[6/9] accumulo git commit: ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Posted by el...@apache.org.
ACCUMULO-4428 Stabilize the new GarbageCollectorIT

Use the clusterControl on MAC to alter the state. Make
the verification based on the filesystem instead of log
messages from the GC.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/8a3cc4f0
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/8a3cc4f0
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/8a3cc4f0

Branch: refs/heads/master
Commit: 8a3cc4f04c61847a7dcc9abd24a83c072a35b00f
Parents: 9edda32
Author: Josh Elser <el...@apache.org>
Authored: Wed Aug 31 15:13:23 2016 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Aug 31 15:13:23 2016 -0400

----------------------------------------------------------------------
 .../test/functional/GarbageCollectorIT.java     | 36 ++++++++------------
 1 file changed, 14 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/8a3cc4f0/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index 3e8abe6..ee790be 100644
--- a/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -99,9 +99,9 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
 
-  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException {
+  private void killMacGc() throws ProcessNotFoundException, InterruptedException, KeeperException, IOException {
     // kill gc started by MAC
-    getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
+    getCluster().getClusterControl().stop(ServerType.GARBAGE_COLLECTOR);
     // delete lock in zookeeper if there, this will allow next GC to start quickly
     String path = ZooUtil.getRoot(new ZooKeeperInstance(getCluster().getClientConfig())) + Constants.ZGC_LOCK;
     ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, OUR_SECRET);
@@ -114,10 +114,6 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
   }
 
-  private void killMacTServer() throws ProcessNotFoundException, InterruptedException, KeeperException {
-      getCluster().killProcess(ServerType.TABLET_SERVER, getCluster().getProcesses().get(ServerType.TABLET_SERVER).iterator().next());
-  }
-
   @Test
   public void gcTest() throws Exception {
     killMacGc();
@@ -170,29 +166,25 @@ public class GarbageCollectorIT extends ConfigurableMacIT {
     // Test WAL log has been created
     List<String> walsBefore = getWALsForTableId(tableId);
     Assert.assertEquals("Should be one WAL", 1, walsBefore.size());
-
-    // Flush and check for no WAL logs
-    c.tableOperations().flush("test_ingest", null, null, true);
-    List<String> walsAfter = getWALsForTableId(tableId);
-    Assert.assertEquals("Should be no WALs", 0, walsAfter.size());
+    final File walToBeDeleted = new File(walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", ""));
 
     // Validate WAL file still exists
-    String walFile = walsBefore.get(0).split("\\|")[0].replaceFirst("file:///", "");
-    File wf = new File(walFile);
-    Assert.assertEquals("WAL file does not exist", true, wf.exists());
+    Assert.assertEquals("WAL file does not exist", true, walToBeDeleted.exists());
 
-    // Kill TServer and give it some time to die and master to rebalance
-    killMacTServer();
+    // Kill TServers and give it some time to die
+    getCluster().getClusterControl().stop(ServerType.TABLET_SERVER);
     UtilWaitThread.sleep(5000);
+    // Restart them or the GC won't ever be able to run a cycle
+    getCluster().getClusterControl().start(ServerType.TABLET_SERVER);
 
     // Restart GC and let it run
-    Process gc = getCluster().exec(SimpleGarbageCollector.class);
-    UtilWaitThread.sleep(60000);
+    getCluster().getClusterControl().start(ServerType.GARBAGE_COLLECTOR);
 
-    // Then check the log for proper events
-    String output = FunctionalTestUtils.readAll(getCluster(), SimpleGarbageCollector.class, gc);
-    assertTrue("WAL GC should have started", output.contains("Beginning garbage collection of write-ahead logs"));
-    assertTrue("WAL was not removed even though tserver was down", output.contains("Removing WAL for offline server"));
+    log.info("Waiting for garbage collector to delete the WAL {}", walToBeDeleted);
+    while (walToBeDeleted.exists()) {
+      // Wait for the file to be deleted
+      Thread.sleep(2000);
+    }
   }
 
   @Test