You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:26 UTC
[48/50] [abbrv] git commit: ACCUMULO-2583 Advertise peer master
coordinator service port in ZK.
ACCUMULO-2583 Advertise peer master coordinator service port in ZK.
Need to use the proper contact info for the peer master. Some more configuration
Properties for tweaking things. Better logging.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/e84879c8
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/e84879c8
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/e84879c8
Branch: refs/heads/ACCUMULO-378
Commit: e84879c8dd814e1f00c9b109f095a45015224c7f
Parents: 53fc90f
Author: Josh Elser <el...@apache.org>
Authored: Thu May 8 21:56:45 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 8 21:56:45 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/Constants.java | 1 +
.../core/client/impl/ReplicationClient.java | 39 +++++++++++++++++---
.../org/apache/accumulo/core/conf/Property.java | 10 ++++-
.../java/org/apache/accumulo/master/Master.java | 16 +++++++-
.../replication/ReplicationWorkAssigner.java | 7 +++-
.../apache/accumulo/tserver/TabletServer.java | 2 +-
.../replication/AccumuloReplicaSystem.java | 20 +++++++---
.../test/replication/ReplicationIT.java | 34 +++++++++++------
8 files changed, 100 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index c87119c..f230690 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -43,6 +43,7 @@ public class Constants {
public static final String ZMASTERS = "/masters";
public static final String ZMASTER_LOCK = ZMASTERS + "/lock";
public static final String ZMASTER_GOAL_STATE = ZMASTERS + "/goal_state";
+ public static final String ZMASTER_REPLICATION_COORDINATOR_PORT = ZMASTERS + "/repl_coord_port";
public static final String ZGC = "/gc";
public static final String ZGC_LOCK = ZGC + "/lock";
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index df12ae8..65565f8 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -20,22 +20,29 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
import java.util.List;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException;
-import org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.util.UtilWaitThread;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.zookeeper.ZooReader;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
+import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.net.HostAndPort;
+
public class ReplicationClient {
private static final Logger log = LoggerFactory.getLogger(ReplicationClient.class);
@@ -65,21 +72,41 @@ public class ReplicationClient {
return null;
}
- String master = locations.get(0);
- if (master.endsWith(":0"))
+ // This is the master thrift service, we just want the hostname, not the port
+ String masterThriftService = locations.get(0);
+ if (masterThriftService.endsWith(":0"))
+ return null;
+
+
+ AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance);
+
+ HostAndPort masterAddr = HostAndPort.fromString(masterThriftService);
+ String zkPath = ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT;
+ String replCoordinatorPort;
+
+ // Get the coordinator port for the master we're trying to connect to
+ try {
+ ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
+ replCoordinatorPort = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8);
+ } catch (KeeperException | InterruptedException e) {
+ log.debug("Could not fetch remote coordinator port");
return null;
+ }
+
+ // Throw the hostname and port through HostAndPort to get some normalization
+ HostAndPort coordinatorAddr = HostAndPort.fromParts(masterAddr.getHostText(), Integer.parseInt(replCoordinatorPort));
try {
// Master requests can take a long time: don't ever time out
- ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), master,
- ServerConfigurationUtil.getConfiguration(instance));
+ ReplicationCoordinator.Client client = ThriftUtil.getClientNoTimeout(new ReplicationCoordinator.Client.Factory(), coordinatorAddr.toString(),
+ conf);
return client;
} catch (TTransportException tte) {
if (tte.getCause().getClass().equals(UnknownHostException.class)) {
// do not expect to recover from this
throw new RuntimeException(tte);
}
- log.debug("Failed to connect to master={}, will retry... ", master, tte);
+ log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(), tte);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index d611bd5..a87b1b4 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -182,6 +182,12 @@ public enum Property {
@Experimental
MASTER_REPLICATION_SCAN_INTERVAL("master.replication.status.scan.interval", "30s", PropertyType.TIMEDURATION,
"Amount of time to sleep before scanning the status section of the replication table for new data"),
+ @Experimental
+ MASTER_REPLICATION_COORDINATOR_PORT("master.replication.coordinator.port", "10001", PropertyType.PORT, "Port for the replication coordinator service"),
+ @Experimental
+ MASTER_REPLICATION_COORDINATOR_MINTHREADS("master.replication.coordinator.minthreads", "4", PropertyType.COUNT, "Minimum number of threads dedicated to answering coordinator requests"),
+ @Experimental
+ MASTER_REPLICATION_COORDINATOR_THREADCHECK("master.replication.coordinator.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the coordinator thread pool"),
// properties that are specific to tablet server behavior
TSERV_PREFIX("tserver.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the tablet servers"),
@@ -457,13 +463,15 @@ public enum Property {
@Experimental
REPLICATION_WORKER_THREADS("replication.worker.threads", "4", PropertyType.COUNT, "Size of the threadpool that each tabletserver devotes to replicating data"),
@Experimental
- REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10001", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
+ REPLICATION_RECEIPT_SERVICE_PORT("replication.receipt.service.port", "10002", PropertyType.PORT, "Listen port used by thrift service in tserver listening for replication"),
@Experimental
REPLICATION_WORK_ATTEMPTS("replication.work.attempts", "10", PropertyType.COUNT, "Number of attempts to try to replicate some data before giving up and letting it naturally be retried later"),
@Experimental
REPLICATION_MIN_THREADS("replication.receiver.min.threads", "1", PropertyType.COUNT, "Minimum number of threads for replciation"),
@Experimental
REPLICATION_THREADCHECK("replication.receiver.threadcheck.time", "5s", PropertyType.TIMEDURATION, "The time between adjustments of the replication thread pool."),
+ @Experimental
+ REPLICATION_MAX_UNIT_SIZE("replication.max.unit.size", "64M", PropertyType.MEMORY, "Maximum size of data to send in a replication message"),
;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/Master.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 0eac9ab..b0745eb 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -60,6 +60,7 @@ import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.MetadataTable;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
+import org.apache.accumulo.core.replication.thrift.ReplicationCoordinator;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.security.NamespacePermission;
@@ -76,6 +77,7 @@ import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.master.recovery.RecoveryManager;
+import org.apache.accumulo.master.replication.MasterReplicationCoordinator;
import org.apache.accumulo.master.replication.ReplicationDriver;
import org.apache.accumulo.master.replication.ReplicationWorkAssigner;
import org.apache.accumulo.master.state.TableCounts;
@@ -977,8 +979,6 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
throw new IOException(e);
}
-
-
Processor<Iface> processor = new Processor<Iface>(TraceWrap.service(new MasterClientServiceHandler(this)));
ServerAddress sa = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_CLIENTPORT, processor, "Master",
"Master Client Service Handler", null, Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
@@ -990,6 +990,17 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
while (!clientService.isServing()) {
UtilWaitThread.sleep(100);
}
+
+ // Start the replication coordinator which assigns tservers to service replication requests
+ ReplicationCoordinator.Processor<ReplicationCoordinator.Iface> replicationCoordinatorProcessor = new ReplicationCoordinator.Processor<ReplicationCoordinator.Iface>(
+ TraceWrap.service(new MasterReplicationCoordinator(this, getSystemConfiguration())));
+ ServerAddress replAddress = TServerUtils.startServer(getSystemConfiguration(), hostname, Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, "Master Replication Coordinator",
+ "Replication Coordinator", null, Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
+
+ // Advertise that port we used so peers don't have to be told what it is
+ ZooReaderWriter.getInstance().putPersistentData(ZooUtil.getRoot(instance) + Constants.ZMASTER_REPLICATION_COORDINATOR_PORT,
+ Integer.toString(replAddress.address.getPort()).getBytes(StandardCharsets.UTF_8), NodeExistsPolicy.OVERWRITE);
+
while (clientService.isServing()) {
UtilWaitThread.sleep(500);
}
@@ -1000,6 +1011,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt
statusThread.join(remaining(deadline));
replicationWorkAssigner.join(remaining(deadline));
replicationWorkDriver.join(remaining(deadline));
+ replAddress.server.stop();
// quit, even if the tablet servers somehow jam up and the watchers
// don't stop
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
index 24842a9..1dd20da 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -152,6 +152,8 @@ public class ReplicationWorkAssigner extends Daemon {
@Override
public void run() {
+ log.info("Starting replication work assignment thread");
+
while (master.stillMaster()) {
if (null == conf) {
conf = master.getConfiguration().getConfiguration();
@@ -178,7 +180,9 @@ public class ReplicationWorkAssigner extends Daemon {
// Keep the state of the work we queued correct
cleanupFinishedWork();
- UtilWaitThread.sleep(conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP));
+ long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
+ log.debug("Sleeping {} ms", sleepTime);
+ UtilWaitThread.sleep(sleepTime);
}
}
@@ -205,6 +209,7 @@ public class ReplicationWorkAssigner extends Daemon {
// to add more work entries
if (queuedWork.size() > maxQueueSize) {
log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
+ UtilWaitThread.sleep(5000);
return;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 803419c..0b5c6bf 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3122,7 +3122,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
}
private HostAndPort startReplicationService() throws UnknownHostException {
- ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler());
+ ReplicationServicer.Iface repl = TraceWrap.service(new ReplicationServicerHandler(this));
ReplicationServicer.Processor<ReplicationServicer.Iface> processor = new ReplicationServicer.Processor<ReplicationServicer.Iface>(repl);
AccumuloConfiguration conf = getSystemConfiguration();
Property maxMessageSizeProperty = (conf.get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
index 40676f7..f275cdb 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.java
@@ -131,21 +131,29 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
Long entriesReplicated;
//TODO should chunk up the given file into some configurable sizes instead of just sending the entire file all at once
// configuration should probably just be size based.
- final long sizeLimit = Long.MAX_VALUE;
+ final long sizeLimit = conf.getMemoryInBytes(Property.REPLICATION_MAX_UNIT_SIZE);
try {
entriesReplicated = ReplicationClient.executeServicerWithReturn(peerInstance, peerTserver, new ClientExecReturn<Long,ReplicationServicer.Client>() {
@Override
public Long execute(Client client) throws Exception {
// RFiles have an extension, call everything else a WAL
if (p.getName().endsWith(RFILE_SUFFIX)) {
- return client.replicateKeyValues(remoteTableId, getKeyValues(p, status, sizeLimit));
+ KeyValues kvs = getKeyValues(p, status, sizeLimit);
+ if (0 < kvs.getKeyValuesSize()) {
+ return client.replicateKeyValues(remoteTableId, kvs);
+ }
} else {
- return client.replicateLog(remoteTableId, getWalEdits(p, status, sizeLimit));
+ WalEdits edits = getWalEdits(p, status, sizeLimit);
+ if (0 < edits.getEditsSize()) {
+ return client.replicateLog(remoteTableId, edits);
+ }
}
+
+ return 0l;
}
});
- log.debug("Replicated {} entries from {} to {} which is a part of {}", entriesReplicated, p, peerTserver, peerInstance.getInstanceName());
+ log.debug("Replicated {} entries from {} to {} which is a member of the peer '{}'", entriesReplicated, p, peerTserver, peerInstance.getInstanceName());
// Update the begin to account for what we replicated
Status updatedStatus = Status.newBuilder(status).setBegin(status.getBegin() + entriesReplicated).build();
@@ -195,7 +203,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
key.readFields(wal);
value.readFields(wal);
} catch (EOFException e) {
- log.trace("Caught EOFException, no more data to replicate");
+ log.debug("Caught EOFException, no more data to replicate");
break;
}
@@ -217,7 +225,7 @@ public class AccumuloReplicaSystem implements ReplicaSystem {
}
}
- log.debug("Returning {} bytes of WAL entries for replication for {}", size, p);
+ log.debug("Binned {} bytes of WAL entries for replication to peer '{}'", size, p);
return edits;
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/e84879c8/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
index 0d4099c..0ad8066 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/ReplicationIT.java
@@ -33,9 +33,12 @@ import org.apache.accumulo.server.replication.ReplicationTable;
import org.apache.accumulo.test.functional.ConfigurableMacIT;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Assert;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.google.protobuf.TextFormat;
@@ -43,13 +46,16 @@ import com.google.protobuf.TextFormat;
*
*/
public class ReplicationIT extends ConfigurableMacIT {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationIT.class);
@Override
public void configure(MiniAccumuloConfigImpl cfg, Configuration hadoopCoreSite) {
cfg.setNumTservers(1);
- cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "1M");
+ cfg.setProperty(Property.TSERV_WALOG_MAX_SIZE, "32M");
cfg.setProperty(Property.GC_CYCLE_START, "1s");
- cfg.setProperty(Property.GC_CYCLE_DELAY, "1s");
+ cfg.setProperty(Property.GC_CYCLE_DELAY, "5s");
+ cfg.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "5s");
+ cfg.setProperty(Property.REPLICATION_MAX_UNIT_SIZE, "8M");
hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
}
@@ -59,7 +65,8 @@ public class ReplicationIT extends ConfigurableMacIT {
ROOT_PASSWORD);
peerCfg.setNumTservers(1);
peerCfg.setInstanceName("peer");
- peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10002");
+ peerCfg.setProperty(Property.MASTER_REPLICATION_COORDINATOR_PORT, "10003");
+ peerCfg.setProperty(Property.REPLICATION_RECEIPT_SERVICE_PORT, "10004");
MiniAccumuloClusterImpl peerCluster = peerCfg.build();
peerCluster.start();
@@ -89,13 +96,15 @@ public class ReplicationIT extends ConfigurableMacIT {
// Write some data to table1
BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
- for (int rows = 0; rows < 250; rows++) {
- Mutation m = new Mutation(Integer.toString(rows));
- for (int cols = 0; cols < 500; cols++) {
- String value = Integer.toString(cols);
- m.put(value, "", value);
+ for (int i = 0; i < 100; i++) {
+ for (int rows = 0; rows < 1000; rows++) {
+ Mutation m = new Mutation(i + "_" + Integer.toString(rows));
+ for (int cols = 0; cols < 400; cols++) {
+ String value = Integer.toString(cols);
+ m.put(value, "", value);
+ }
+ bw.addMutation(m);
}
- bw.addMutation(m);
}
bw.close();
@@ -104,17 +113,18 @@ public class ReplicationIT extends ConfigurableMacIT {
Thread.sleep(500);
}
+ connMaster.tableOperations().compact(masterTable, null, null, true, false);
for (int i = 0; i < 10; i++) {
-
Scanner s = ReplicationTable.getScanner(connMaster);
for (Entry<Key,Value> e : s) {
- log.info(e.getKey().toStringNoTruncate() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
+ Path p = new Path(e.getKey().getRow().toString());
+ log.info(p.getName() + " " + e.getKey().getColumnFamily() + " " + e.getKey().getColumnQualifier() + " " + TextFormat.shortDebugString(Status.parseFrom(e.getValue().get())));
}
log.info("");
log.info("");
- Thread.sleep(1000);
+ Thread.sleep(3000);
}
peerCluster.stop();