You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/23 00:37:45 UTC
[1/3] git commit: ACCUMULO-378 Need to turn on Trace for traces to be
logged.
Repository: accumulo
Updated Branches:
refs/heads/ACCUMULO-378 e798d5008 -> 0f6b6d0fb
ACCUMULO-378 Need to turn on Trace for traces to be logged.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/4288bbe4
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/4288bbe4
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/4288bbe4
Branch: refs/heads/ACCUMULO-378
Commit: 4288bbe442394af8512cd0d801dbb98d76d10b46
Parents: e798d50
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 17:15:00 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 17:15:29 2014 -0400
----------------------------------------------------------------------
.../accumulo/master/replication/ReplicationDriver.java | 11 +++++++++++
1 file changed, 11 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/4288bbe4/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index 3069c97..b340009 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -24,6 +24,9 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.util.Daemon;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.master.Master;
+import org.apache.accumulo.trace.instrument.CountSampler;
+import org.apache.accumulo.trace.instrument.Sampler;
+import org.apache.accumulo.trace.instrument.Trace;
import org.apache.log4j.Logger;
/**
@@ -50,6 +53,8 @@ public class ReplicationDriver extends Daemon {
@Override
public void run() {
+ Sampler sampler = new CountSampler(10);
+
while (master.stillMaster()) {
if (null == workMaker) {
try {
@@ -67,6 +72,10 @@ public class ReplicationDriver extends Daemon {
rcrr = new RemoveCompleteReplicationRecords(conn);
}
+ if (sampler.next()) {
+ Trace.on("masterReplicationDriver");
+ }
+
// Make status markers from replication records in metadata, removing entries in
// metadata which are no longer needed (closed records)
// This will end up creating the replication table too
@@ -83,6 +92,8 @@ public class ReplicationDriver extends Daemon {
// So it's important that we run these sequentially and not concurrently
rcrr.run();
+ Trace.offNoFlush();
+
// Sleep for a bit
UtilWaitThread.sleep(conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL));
}
[2/3] git commit: ACCUMULO-378 Better logging to whom the master is
connecting
Posted by el...@apache.org.
ACCUMULO-378 Better logging to whom the master is connecting
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/327b0abc
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/327b0abc
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/327b0abc
Branch: refs/heads/ACCUMULO-378
Commit: 327b0abc0dfcdf17422effd2f797fcfc519b6373
Parents: 4288bbe
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:32:53 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:32:53 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/client/impl/ReplicationClient.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/327b0abc/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index 6e36759..02ae3d0 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -83,6 +83,8 @@ public class ReplicationClient {
String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_ADDR;
String replCoordinatorAddr;
+ log.debug("Using ZooKeeper quorum at {} with path {} to find peer Master information", instance.getZooKeepers(), zkPath);
+
// Get the coordinator port for the master we're trying to connect to
try {
ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
@@ -95,6 +97,8 @@ public class ReplicationClient {
// Throw the hostname and port through HostAndPort to get some normalization
HostAndPort coordinatorAddr = HostAndPort.fromString(replCoordinatorAddr);
+ log.debug("Connecting to master at {}", coordinatorAddr.toString());
+
try {
// Master requests can take a long time: don't ever time out
ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(),
[3/3] git commit: ACCUMULO-2834 First attempt at immediately
requeueing a file for replication when more work is needed.
Posted by el...@apache.org.
ACCUMULO-2834 First attempt at immediately requeueing a file for replication when more work is needed.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0f6b6d0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0f6b6d0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0f6b6d0f
Branch: refs/heads/ACCUMULO-378
Commit: 0f6b6d0fb091440ad4780db1cf3c11f4a606de59
Parents: 327b0ab
Author: Josh Elser <el...@apache.org>
Authored: Thu May 22 18:33:11 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 22 18:33:11 2014 -0400
----------------------------------------------------------------------
.../replication/ReplicationProcessor.java | 56 ++++++++++++--------
.../replication/ReplicationProcessorTest.java | 49 +++++++++++++++++
2 files changed, 84 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
index d451991..ab939c5 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationProcessor.java
@@ -119,27 +119,9 @@ public class ReplicationProcessor implements Processor {
}
log.debug("Replicating {} to {} using {}", filePath, target, replica.getClass().getName());
-
- // Replicate that sucker
- Status replicatedStatus = replica.replicate(filePath, status, target);
-
- log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
-
- // If we got a different status
- if (!replicatedStatus.equals(status)) {
- // We actually did some work!
- recordNewStatus(filePath, replicatedStatus, target);
- return;
- }
-
- log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
- TextFormat.shortDebugString(replicatedStatus));
-
- // otherwise, we didn't actually replicate because there was error sending the data
- // we can just not record any updates, and it will be picked up again by the work assigner
}
- public String getPeerType(String peerName) {
+ protected String getPeerType(String peerName) {
// Find the configured replication peer so we know how to replicate to it
Map<String,String> configuredPeers = conf.getAllPropertiesWithPrefix(Property.REPLICATION_PEERS);
String peerType = configuredPeers.get(Property.REPLICATION_PEERS.getKey() + peerName);
@@ -152,7 +134,7 @@ public class ReplicationProcessor implements Processor {
return peerType;
}
- public Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
+ protected Status getStatus(String file, ReplicationTarget target) throws TableNotFoundException, AccumuloException, AccumuloSecurityException,
InvalidProtocolBufferException {
Scanner s = ReplicationTable.getScanner(inst.getConnector(creds.getPrincipal(), creds.getToken()));
s.setRange(Range.exact(file));
@@ -161,6 +143,38 @@ public class ReplicationProcessor implements Processor {
return Status.parseFrom(Iterables.getOnlyElement(s).getValue().get());
}
+ protected void replicate(ReplicaSystem replica, Path filePath, Status status, ReplicationTarget target) {
+ Status lastStatus = status;
+ while (true) {
+ // Replicate that sucker
+ Status replicatedStatus = replica.replicate(filePath, status, target);
+
+ log.debug("Completed replication of {} to {}, with new Status {}", filePath, target, ProtobufUtil.toString(replicatedStatus));
+
+ // If we got a different status
+ if (!replicatedStatus.equals(lastStatus)) {
+ // We actually did some work!
+ recordNewStatus(filePath, replicatedStatus, target);
+
+ // If we don't have any more work, just quit
+ if (!StatusUtil.isWorkRequired(replicatedStatus)) {
+ return;
+ } else {
+ // Otherwise, let it loop and replicate some more data
+ lastStatus = status;
+ status = replicatedStatus;
+ }
+ } else {
+ log.debug("Did not replicate any new data for {} to {}, (was {}, now is {})", filePath, target, TextFormat.shortDebugString(status),
+ TextFormat.shortDebugString(replicatedStatus));
+
+ // otherwise, we didn't actually replicate because there was error sending the data
+ // we can just not record any updates, and it will be picked up again by the work assigner
+ return;
+ }
+ }
+ }
+
/**
* Record the updated Status for this file and target
*
@@ -171,7 +185,7 @@ public class ReplicationProcessor implements Processor {
* @param target
* Peer that was replicated to
*/
- public void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
+ protected void recordNewStatus(Path filePath, Status status, ReplicationTarget target) {
try {
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
BatchWriter bw = ReplicationTable.getBatchWriter(conn);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0f6b6d0f/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
index df4845c..c88e091 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/replication/ReplicationProcessorTest.java
@@ -20,11 +20,15 @@ import java.util.HashMap;
import java.util.Map;
import org.apache.accumulo.core.client.Instance;
+import org.apache.accumulo.core.client.replication.ReplicaSystem;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.server.fs.VolumeManager;
+import org.apache.hadoop.fs.Path;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
@@ -65,4 +69,49 @@ public class ReplicationProcessorTest {
proc.getPeerType("foo");
}
+
+ @Test
+ public void filesContinueReplicationWhenMoreDataIsPresent() throws Exception {
+ ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+ ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+ ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+ Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Path path = new Path("/accumulo");
+
+ Status firstStatus = Status.newBuilder().setBegin(100).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Status secondStatus = Status.newBuilder().setBegin(Long.MAX_VALUE).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+
+ EasyMock.expect(replica.replicate(path, status, target)).andReturn(firstStatus);
+ proc.recordNewStatus(path, firstStatus, target);
+ EasyMock.expectLastCall().once();
+
+ EasyMock.expect(replica.replicate(path, firstStatus, target)).andReturn(secondStatus);
+ proc.recordNewStatus(path, secondStatus, target);
+ EasyMock.expectLastCall().once();
+
+ EasyMock.replay(replica, proc);
+
+ proc.replicate(replica, path, status, target);
+
+ EasyMock.verify(replica, proc);
+ }
+
+ @Test
+ public void filesWhichMakeNoProgressArentReplicatedAgain() throws Exception {
+ ReplicaSystem replica = EasyMock.createMock(ReplicaSystem.class);
+ ReplicationProcessor proc = EasyMock.createMockBuilder(ReplicationProcessor.class).addMockedMethod("recordNewStatus").createMock();
+
+ ReplicationTarget target = new ReplicationTarget("peer", "1", "1");
+ Status status = Status.newBuilder().setBegin(0).setEnd(0).setInfiniteEnd(true).setClosed(true).build();
+ Path path = new Path("/accumulo");
+
+ EasyMock.expect(replica.replicate(path, status, target)).andReturn(status);
+
+ EasyMock.replay(replica, proc);
+
+ proc.replicate(replica, path, status, target);
+
+ EasyMock.verify(replica, proc);
+ }
}