You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/21 04:00:02 UTC
[43/50] [abbrv] git commit: ACCUMULO-378 Fix a failed test by
reducing the initial delay and period of the DWQ for replication
ACCUMULO-378 Fix a failed test by reducing the initial delay and period of the DWQ for replication
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/c3d99ec7
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/c3d99ec7
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/c3d99ec7
Branch: refs/heads/ACCUMULO-378
Commit: c3d99ec7fe29f1fa0acce9256475a8ec4c57cfd6
Parents: 215e2b5
Author: Josh Elser <el...@apache.org>
Authored: Tue May 20 13:45:28 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Tue May 20 13:45:28 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/conf/Property.java | 4 +++
.../server/zookeeper/DistributedWorkQueue.java | 11 ++++++-
.../replication/ReplicationProcessor.java | 2 ++
.../tserver/replication/ReplicationWorker.java | 21 ++++++++++++-
.../test/replication/ReplicationWithGCIT.java | 32 +++++++++++++++-----
test/src/test/resources/log4j.properties | 2 +-
6 files changed, 61 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 21aa578..b1ee499 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -482,6 +482,10 @@ public enum Property {
@Experimental
REPLICATION_WORK_ASSIGNER("replication.work.assigner", "org.apache.accumulo.master.replication.DistributedWorkQueueWorkAssigner", PropertyType.CLASSNAME,
"Replication WorkAssigner implementation to use"),
+ @Experimental
+ REPLICATION_WORK_PROCESSOR_DELAY("replication.work.processor.delay", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before first checking for replication work"),
+ @Experimental
+ REPLICATION_WORK_PROCESSOR_PERIOD("replication.work.processor.period", "0s", PropertyType.TIMEDURATION, "Amount of time to wait before re-checking for replication work"),
;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index b1907fb..29c8fcf 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -50,6 +50,7 @@ public class DistributedWorkQueue {
private ZooReaderWriter zoo = ZooReaderWriter.getInstance();
private String path;
private AccumuloConfiguration config;
+ private long timerInitialDelay, timerPeriod;
private AtomicInteger numTask = new AtomicInteger(0);
@@ -151,8 +152,15 @@ public class DistributedWorkQueue {
}
public DistributedWorkQueue(String path, AccumuloConfiguration config) {
+ // Preserve the old delay and period
+ this(path, config, new Random().nextInt(60*1000), 60*1000);
+ }
+
+ public DistributedWorkQueue(String path, AccumuloConfiguration config, long timerInitialDelay, long timerPeriod) {
this.path = path;
this.config = config;
+ this.timerInitialDelay = timerInitialDelay;
+ this.timerPeriod = timerPeriod;
}
public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) throws KeeperException, InterruptedException {
@@ -196,6 +204,7 @@ public class DistributedWorkQueue {
SimpleTimer.getInstance(config).schedule(new Runnable() {
@Override
public void run() {
+ log.debug("Looking for work in " + path);
try {
lookForWork(processor, zoo.getChildren(path));
} catch (KeeperException e) {
@@ -204,7 +213,7 @@ public class DistributedWorkQueue {
log.info("Interrupted looking for work", e);
}
}
- }, r.nextInt(60 * 1000), 60 * 1000);
+ }, timerInitialDelay, timerPeriod);
}
/**
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index 481b3e8..149af6e 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -123,6 +123,8 @@ public class ReplicationProcessor implements Processor {
// Replicate that sucker
Status replicatedStatus = replica.replicate(filePath, status, target);
+ log.debug("Completed replication of {} to {}, with new Status [{}]", filePath, target, ProtobufUtil.toString(replicatedStatus));
+
// If we got a different status
if (!replicatedStatus.equals(status)) {
// We actually did some work!
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index 01258de..a223511 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -21,16 +21,21 @@ import java.util.concurrent.ThreadPoolExecutor;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.server.fs.VolumeManager;
import org.apache.accumulo.server.security.SystemCredentials;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
public class ReplicationWorker implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationWorker.class);
private Instance inst;
private VolumeManager fs;
@@ -49,8 +54,22 @@ public class ReplicationWorker implements Runnable {
@Override
public void run() {
+ DefaultConfiguration defaultConf = DefaultConfiguration.getDefaultConfiguration();
+ long defaultDelay = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
+ long defaultPeriod = defaultConf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
+ long delay = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_DELAY);
+ long period = conf.getTimeInMillis(Property.REPLICATION_WORK_PROCESSOR_PERIOD);
try {
- new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf).startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()), executor);
+ DistributedWorkQueue workQueue;
+ if (defaultDelay != delay && defaultPeriod != period) {
+ log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay, period);
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf, delay, period);
+ } else {
+ log.debug("Configuring DistributedWorkQueue with default delay and period");
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(inst) + Constants.ZREPLICATION_WORK_QUEUE, conf);
+ }
+
+ workQueue.startProcessing(new ReplicationProcessor(inst, conf, fs, SystemCredentials.get()), executor);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
index 23da719..2a7a210 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationWithGCIT.java
@@ -83,6 +83,8 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
cfg.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
cfg.setProperty(Property.REPLICATION_NAME, "master");
+ cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_DELAY, "1s");
+ cfg.setProperty(Property.REPLICATION_WORK_PROCESSOR_PERIOD, "1s");
cfg.setNumTservers(1);
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -331,6 +333,11 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
@Test(timeout = 5 * 60 * 1000)
public void replicatedStatusEntriesAreDeleted() throws Exception {
+ // Just stop it now, we'll restart it after we restart the tserver
+ for (ProcessReference proc : getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR)) {
+ getCluster().killProcess(ServerType.GARBAGE_COLLECTOR, proc);
+ }
+
final Connector conn = getConnector();
log.info("Got connector to MAC");
String table1 = "table1";
@@ -470,7 +477,12 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
boolean foundResults = false;
for (int i = 0; i < 5; i++) {
s = ReplicationTable.getScanner(conn);
- if (Iterables.size(s) > 0) {
+ int count = 0;
+ for (Entry<Key,Value> entry : s) {
+ count++;
+ log.info("{}={}", entry.getKey().toStringNoTruncate(), entry.getValue());
+ }
+ if (count > 0) {
foundResults = true;
break;
}
@@ -479,9 +491,16 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
Assert.assertTrue("Did not find any replication entries in the replication table", foundResults);
+ getCluster().exec(SimpleGarbageCollector.class);
+
+ // Wait for a bit since the GC has to run (should be running after a one second delay)
+ Thread.sleep(5000);
+
// We expect no records in the metadata table after compaction. We have to poll
// because we have to wait for the StatusMaker's next iteration which will clean
- // up the dangling record after we create the record in the replication table
+ // up the dangling *closed* records after we create the record in the replication table.
+ // We need the GC to close the file (CloseWriteAheadLogReferences) before we can remove the record
+ log.info("Checking metadata table for replication entries");
foundResults = true;
for (int i = 0; i < 5; i++) {
s = conn.createScanner(MetadataTable.NAME, Authorizations.EMPTY);
@@ -493,16 +512,13 @@ public class ReplicationWithGCIT extends ConfigurableMacIT {
Thread.sleep(1000);
}
- Assert.assertFalse("Replication status messages were not cleaned up from metadata table, check why the StatusMaker didn't delete them", foundResults);
+ Assert.assertFalse("Replication status messages were not cleaned up from metadata table", foundResults);
/**
- * After we set the begin to Long.MAX_VALUE, the RemoveCompleteReplicationRecords class will start deleting the records which have been closed by
- * the minor compaction and replicated by the MockReplicaSystem
+ * After we close out and subsequently delete the metadata record, this will propagate to the replication table,
+ * which will cause those records to be deleted after repliation occurs
*/
- // Wait for a bit since the GC has to run (should be running after a one second delay)
- Thread.sleep(5000);
-
int recordsFound = 0;
for (int i = 0; i < 10; i++) {
s = ReplicationTable.getScanner(conn);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/c3d99ec7/test/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/test/src/test/resources/log4j.properties b/test/src/test/resources/log4j.properties
index 407bc28..9f991d1 100644
--- a/test/src/test/resources/log4j.properties
+++ b/test/src/test/resources/log4j.properties
@@ -35,7 +35,7 @@ log4j.logger.org.apache.accumulo.core.file.rfile.bcfile=INFO
log4j.logger.org.apache.accumulo.server.util.ReplicationTableUtil=TRACE
log4j.logger.org.apache.accumulo.core.client.impl.TabletServerBatchReaderIterator=INFO
log4j.logger.org.apache.accumulo.core.client.impl.ThriftScanner=INFO
-log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
+#log4j.logger.org.apache.accumulo.server.zookeeper.DistributedWorkQueue=INFO
log4j.logger.org.apache.accumulo.fate.zookeeper.DistributedReadWriteLock=WARN
log4j.logger.org.mortbay.log=WARN
log4j.logger.org.apache.hadoop=WARN