You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 04:00:02 UTC

[43/50] [abbrv] git commit: ACCUMULO-378 Fix a failed test by reducing the initial delay and period of the DWQ for replication

ACCUMULO-378 Fix a failed test by reducing the initial delay and period of the DWQ for replication


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c3d99ec7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c3d99ec7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c3d99ec7

Branch: refs/heads/ACCUMULO-378
Commit: c3d99ec7fe29f1fa0acce9256475a8ec4c57cfd6
Parents: 215e2b5
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 13:45:28 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 13:45:28 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/conf/Property.java |  4 +++
 .../server/zookeeper/DistributedWorkQueue.java  | 11 ++++++-
 .../replication/ReplicationProcessor.java       |  2 ++
 .../tserver/replication/ReplicationWorker.java  | 21 ++++++++++++-
 .../test/replication/ReplicationWithGCIT.java   | 32 +++++++++++++++-----
 test/src/test/resources/log4j.properties        |  2 +-
 6 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 21aa578..b1ee499 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -482,6 +482,10 @@ public enum Property {
   @Experimental
   REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner", PropertyType.CLASSNAME,
       "Replication WorkAssigner implementation to use"),
+  @Experimental
+  REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
+  @Experimental
+  REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"),
 
   ;
 

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index b1907fb..29c8fcf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -50,6 +50,7 @@ public class DistributedWorkQueue {
   private ZooReaderWriter zoo = ZooReaderWriter.getInstance();
   private String path;
   private AccumuloConfiguration config;
+  private long timerInitialDelay, timerPeriod;
 
   private AtomicInteger numTask = new AtomicInteger(0);
 
@@ -151,8 +152,15 @@ public class DistributedWorkQueue {
   }
   
   public DistributedWorkQueue(String path, AccumuloConfiguration config) {
+    // Preserve the old delay and period
+    this(path, config, new Random().nextInt(60*1000), 60*1000);
+  }
+
+  public DistributedWorkQueue(String path, AccumuloConfiguration config, long timerInitialDelay, long timerPeriod) {
     this.path = path;
     this.config = config;
+    this.timerInitialDelay = timerInitialDelay;
+    this.timerPeriod = timerPeriod;
   }
   
   public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) throws KeeperException, InterruptedException {
@@ -196,6 +204,7 @@ public class DistributedWorkQueue {
     SimpleTimer.getInstance(config).schedule(new Runnable() {
       @Override
       public void run() {
+        log.debug("Looking for work in " + path);
         try {
           lookForWork(processor, zoo.getChildren(path));
         } catch (KeeperException e) {
@@ -204,7 +213,7 @@ public class DistributedWorkQueue {
           log.info("Interrupted looking for work", e);
         }
       }
-    }, r.nextInt(60 * 1000), 60 * 1000);
+    }, timerInitialDelay, timerPeriod);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 481b3e8..149af6e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -123,6 +123,8 @@ public class ReplicationProcessor implements Processor {
     // Replicate that sucker
     Status replicatedStatus = replica.replicate(filePath, status, target);
 
+    log.debug("Completed replication of {} to {}, with new Status [{}]", filePath, target, ProtobufUtil.toString(replicatedStatus));
+
     // If we got a different status
     if (!replicatedStatus.equals(status)) {
       // We actually did some work!

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index 01258de..a223511 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -21,16 +21,21 @@ import java.util.concurrent.ThreadPoolExecutor;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.security.SystemCredentials;
 import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
 import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
  */
 public class ReplicationWorker implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);
 
   private Instance inst;
   private VolumeManager fs;
@@ -49,8 +54,22 @@ public class ReplicationWorker implements Runnable {
 
   @Override
   public void run() {
+    DefaultConfiguration defaultConf = DefaultConfiguration.getDefaultConfiguration();
+    long defaultDelay = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
+    long defaultPeriod = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
+    long delay = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
+    long period = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
     try {
-      new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf).startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()), executor);
+      DistributedWorkQueue workQueue;
+      if (defaultDelay != delay && defaultPeriod != period) {
+        log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay, period);
+        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf, delay, period);
+      } else {
+        log.debug("Configuring DistributedWorkQueue with default delay and period");
+        workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf);
+      }
+
+      workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()), executor);
     } catch (KeeperException | InterruptedException e) {
       throw new RuntimeException(e);
     }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 23da719..2a7a210 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -83,6 +83,8 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
     cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
     cfg.setProperty(Property.REPLICATION_NAME, "master");
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
+    cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
     cfg.setNumTservers(1);
     hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
   }
@@ -331,6 +333,11 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
 
   @Test(timeout = 5 * 60 * 1000)
   public void replicatedStatusEntriesAreDeleted() throws Exception {
+    // Just stop it now, we'll restart it after we restart the tserver
+    for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+      getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+    }
+
     final Connector conn = getConnector();
     log.info("Got connector to MAC");
     String table1 = "table1";
@@ -470,7 +477,12 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
     boolean foundResults = false;
     for (int i = 0; i < 5; i++) {
       s = ReplicationTable.getScanner(conn);
-      if (Iterables.size(s) > 0) {
+      int count = 0;
+      for (Entry<Key,Value> entry : s) {
+        count++;
+        log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+      }
+      if (count > 0) {
         foundResults = true;
         break;
       }
@@ -479,9 +491,16 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
 
     Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
 
+    getCluster().exec(SimpleGarbageCollector.class);
+
+    // Wait for a bit since the GC has to run (should be running after a one second delay)
+    Thread.sleep(5000);
+
     // We expect no records in the metadata table after compaction. We have to poll
     // because we have to wait for the StatusMaker's next iteration which will clean
-    // up the dangling record after we create the record in the replication table
+    // up the dangling *closed* records after we create the record in the replication table.
+    // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+    log.info("Checking metadata table for replication entries");
     foundResults = true;
     for (int i = 0; i < 5; i++) {
       s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -493,16 +512,13 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
       Thread.sleep(1000);
     }
 
-    Assert.assertFalse("Replication status messages were not cleaned up from metadata table, check why the StatusMaker didn't delete them", foundResults);
+    Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
 
     /**
-     * After we set the begin to Long.MAX_VALUE, the RemoveCompleteReplicationRecords class will start deleting the records which have been closed by
-     * the minor compaction and replicated by the MockReplicaSystem
+     * After we close out and subsequently delete the metadata record, this will propagate to the replication table,
+     * which will cause those records to be deleted after repliation occurs
      */
 
-    // Wait for a bit since the GC has to run (should be running after a one second delay)
-    Thread.sleep(5000);
-
     int recordsFound = 0;
     for (int i = 0; i < 10; i++) {
       s = ReplicationTable.getScanner(conn);

http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 407bc28..9f991d1 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.accumulo.core.file.rfile.bcfile=INFO
 log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE
 log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
 log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO
-log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
+#log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
 log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
 log4j.logger.org.mortbay.log=WARN
 log4j.logger.org.apache.hadoop=WARN