You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/06/14 06:55:32 UTC
[32/51] [abbrv] git commit: ACCUMULO-378 Core review fixes from
bhavanki for replication
ACCUMULO-378 Core review fixes from bhavanki for replication
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/84e94a42
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/84e94a42
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/84e94a42
Branch: refs/heads/master
Commit: 84e94a429bd92e469156642b1bfd69c422759e2d
Parents: 2f02d69
Author: Josh Elser <el...@apache.org>
Authored: Wed Jun 4 13:52:58 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed Jun 4 13:52:58 2014 -0400
----------------------------------------------------------------------
.../client/admin/ReplicationOperations.java | 12 ++---
.../core/client/impl/ReplicationClient.java | 34 +++++++-----
.../client/impl/ReplicationOperationsImpl.java | 52 ++++++++----------
.../replication/PeerNotFoundException.java | 4 ++
.../core/client/replication/ReplicaSystem.java | 3 +-
.../replication/ReplicaSystemFactory.java | 6 ++-
.../org/apache/accumulo/core/data/Mutation.java | 13 +++--
.../master/replication/ReplicationDriver.java | 13 +++--
.../test/replication/CyclicReplicationIT.java | 43 +++++++--------
.../UnorderedWorkAssignerReplicationIT.java | 57 ++++++++++----------
10 files changed, 128 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
index 1d20f79..5873f73 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/admin/ReplicationOperations.java
@@ -32,14 +32,14 @@ import org.apache.accumulo.core.client.replication.ReplicaSystem;
public interface ReplicationOperations {
/**
- * Define a cluster with the given name using the given {@link ReplicaSystem}
+ * Defines a cluster with the given name using the given {@link ReplicaSystem}.
* @param name Name of the cluster, used for configuring replication on tables
* @param system Type of system to be replicated to
*/
public void addPeer(String name, ReplicaSystem system) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
/**
- * Define a cluster with the given name and the given name system
+ * Defines a cluster with the given name and the given name system.
* @param name Unique name for the cluster
* @param replicaType {@link ReplicaSystem} class name to use to replicate the data
* @throws PeerExistsException
@@ -47,14 +47,14 @@ public interface ReplicationOperations {
public void addPeer(String name, String replicaType) throws AccumuloException, AccumuloSecurityException, PeerExistsException;
/**
- * Remove a cluster with the given name
+ * Removes a cluster with the given name.
* @param name Name of the cluster to remove
* @throws PeerNotFoundException
*/
public void removePeer(String name) throws AccumuloException, AccumuloSecurityException, PeerNotFoundException;
/**
- * Wait for a table to be fully replicated
+ * Waits for a table to be fully replicated.
* @param tableName The table to wait for
* @throws AccumuloException
* @throws AccumuloSecurityException
@@ -62,7 +62,7 @@ public interface ReplicationOperations {
public void drain(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
/**
- * Wait for a table to be fully replicated as determined by the provided tables
+ * Waits for a table to be fully replicated as determined by the provided tables.
* @param tableName The table to wait for
* @throws AccumuloException
* @throws AccumuloSecurityException
@@ -70,7 +70,7 @@ public interface ReplicationOperations {
public void drain(String tableName, Set<String> files) throws AccumuloException, AccumuloSecurityException, TableNotFoundException;
/**
- * Get all of the referenced files for a table
+ * Gets all of the referenced files for a table.
* @param tableName
* @throws TableNotFoundException
*/
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
index d7b12c7..13c027a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationClient.java
@@ -16,7 +16,6 @@
*/
package org.apache.accumulo.core.client.impl;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import java.net.UnknownHostException;
@@ -52,14 +51,19 @@ public class ReplicationClient {
* @return Client to the ReplicationCoordinator service
*/
public static ReplicationCoordinator.Client getCoordinatorConnectionWithRetry(Instance instance) throws AccumuloException {
- checkArgument(instance != null, "instance is null");
+ checkNotNull(instance);
for (int attempts = 1; attempts <= 10; attempts++) {
ReplicationCoordinator.Client result = getCoordinatorConnection(instance);
if (result != null)
return result;
- UtilWaitThread.sleep(attempts * 250);
+ log.debug("Could not get ReplicationCoordinator connection to {}, will retry", instance.getInstanceName());
+ try {
+ Thread.sleep(attempts * 250);
+ } catch (InterruptedException e) {
+ throw new AccumuloException(e);
+ }
}
throw new AccumuloException("Timed out trying to communicate with master from " + instance.getInstanceName());
@@ -69,14 +73,16 @@ public class ReplicationClient {
List<String> locations = instance.getMasterLocations();
if (locations.size() == 0) {
- log.debug("No masters...");
+ log.debug("No masters for replication to instance {}", instance.getInstanceName());
return null;
}
// This is the master thrift service, we just want the hostname, not the port
String masterThriftService = locations.get(0);
- if (masterThriftService.endsWith(":0"))
+ if (masterThriftService.endsWith(":0")) {
+ log.warn("Master found for {} did not have real location {}", instance.getInstanceName(), masterThriftService);
return null;
+ }
AccumuloConfiguration conf = ServerConfigurationUtil.getConfiguration(instance);
@@ -91,7 +97,7 @@ public class ReplicationClient {
ZooReader reader = new ZooReader(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut());
replCoordinatorAddr = new String(reader.getData(zkPath, null), StandardCharsets.UTF_8);
} catch (KeeperException | InterruptedException e) {
- log.error("Could not fetch remote coordinator port");
+ log.error("Could not fetch remote coordinator port", e);
return null;
}
@@ -106,11 +112,7 @@ public class ReplicationClient {
conf);
return client;
} catch (TTransportException tte) {
- if (tte.getCause().getClass().equals(UnknownHostException.class)) {
- // do not expect to recover from this
- throw new RuntimeException(tte);
- }
- log.debug("Failed to connect to master coordinator service ({}), will retry... ", coordinatorAddr.toString(), tte);
+ log.debug("Failed to connect to master coordinator service ({})", coordinatorAddr.toString(), tte);
return null;
}
}
@@ -157,13 +159,17 @@ public class ReplicationClient {
public static <T> T executeCoordinatorWithReturn(Instance instance, ClientExecReturn<T,ReplicationCoordinator.Client> exec) throws AccumuloException,
AccumuloSecurityException {
ReplicationCoordinator.Client client = null;
- while (true) {
+ for (int i = 0; i < 10; i++) {
try {
client = getCoordinatorConnectionWithRetry(instance);
return exec.execute(client);
} catch (TTransportException tte) {
log.debug("ReplicationClient coordinator request failed, retrying ... ", tte);
- UtilWaitThread.sleep(100);
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ throw new AccumuloException(e);
+ }
} catch (ThriftSecurityException e) {
throw new AccumuloSecurityException(e.user, e.code, e);
} catch (AccumuloException e) {
@@ -175,6 +181,8 @@ public class ReplicationClient {
close(client);
}
}
+
+ throw new AccumuloException("Could not connect to ReplicationCoordinator at " + instance.getInstanceName());
}
public static void executeCoordinator(Instance instance, ClientExec<ReplicationCoordinator.Client> exec) throws AccumuloException, AccumuloSecurityException {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
index 4355867..51a5367 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ReplicationOperationsImpl.java
@@ -66,10 +66,12 @@ import com.google.protobuf.InvalidProtocolBufferException;
public class ReplicationOperationsImpl implements ReplicationOperations {
private static final Logger log = LoggerFactory.getLogger(ReplicationOperationsImpl.class);
- private Instance inst;
- private Credentials creds;
+ private final Instance inst;
+ private final Credentials creds;
public ReplicationOperationsImpl(Instance inst, Credentials creds) {
+ checkNotNull(inst);
+ checkNotNull(creds);
this.inst = inst;
this.creds = creds;
}
@@ -125,32 +127,16 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
checkNotNull(tableName);
Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
- TableOperations tops = conn.tableOperations();
- while (!tops.exists(ReplicationTable.NAME)) {
- UtilWaitThread.sleep(200);
- }
-
- if (!conn.tableOperations().exists(tableName)) {
- throw new TableNotFoundException(null, tableName, null);
- }
-
- String strTableId = null;
- while (null == strTableId) {
- strTableId = tops.tableIdMap().get(tableName);
- if (null == strTableId) {
- UtilWaitThread.sleep(200);
- }
- }
-
- Text tableId = new Text(strTableId);
+ Text tableId = getTableId(conn, tableName);
log.info("Waiting for {} to be replicated for {}", wals, tableId);
log.info("Reading from metadata table");
boolean allMetadataRefsReplicated = false;
+ final Set<Range> range = Collections.singleton(new Range(ReplicationSection.getRange()));
while (!allMetadataRefsReplicated) {
BatchScanner bs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- bs.setRanges(Collections.singleton(new Range(ReplicationSection.getRange())));
+ bs.setRanges(range);
bs.fetchColumnFamily(ReplicationSection.COLF);
try {
allMetadataRefsReplicated = allReferencesReplicated(bs, tableId, wals);
@@ -228,13 +214,7 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
return true;
}
- @Override
- public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
- checkNotNull(tableName);
-
- log.debug("Collecting referenced files for replication of table {}", tableName);
-
- Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+ protected Text getTableId(Connector conn, String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
TableOperations tops = conn.tableOperations();
while (!tops.exists(ReplicationTable.NAME)) {
UtilWaitThread.sleep(200);
@@ -252,13 +232,23 @@ public class ReplicationOperationsImpl implements ReplicationOperations {
}
}
- Text tableId = new Text(strTableId);
+ return new Text(strTableId);
+ }
+
+ @Override
+ public Set<String> referencedFiles(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException {
+ checkNotNull(tableName);
+
+ log.debug("Collecting referenced files for replication of table {}", tableName);
+
+ Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+ Text tableId = getTableId(conn, tableName);
- log.debug("Found id of {} for name {}", strTableId, tableName);
+ log.debug("Found id of {} for name {}", tableId, tableName);
// Get the WALs currently referenced by the table
BatchScanner metaBs = conn.createBatchScanner(MetadataTable.NAME, Authorizations.EMPTY, 4);
- metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(strTableId)));
+ metaBs.setRanges(Collections.singleton(MetadataSchema.TabletsSection.getRange(tableId.toString())));
metaBs.fetchColumnFamily(LogColumnFamily.NAME);
Set<String> wals = new HashSet<>();
try {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
index 1859c62..4e02218 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/PeerNotFoundException.java
@@ -33,4 +33,8 @@ public class PeerNotFoundException extends Exception {
public PeerNotFoundException(String message, Throwable cause) {
super(message, cause);
}
+
+ public PeerNotFoundException(String peer, String message, Throwable cause) {
+ super("Peer '" + peer + "' not found " + message, cause);
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
index e20d35f..cc51a11 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystem.java
@@ -31,6 +31,7 @@ public interface ReplicaSystem {
* @param p Path to the resource we're reading from
* @param status Information to replicate
* @param target The peer
+ * @param helper Instance of ReplicaSystemHelper
* @return A new Status for the progress that was made
*/
public Status replicate(Path p, Status status, ReplicationTarget target, ReplicaSystemHelper helper);
@@ -39,7 +40,7 @@ public interface ReplicaSystem {
* Configure the implementation with necessary information from the system configuration
* <p>
* For example, we only need one implementation for Accumulo, but, for each peer,
- * we have a ZK quorom and instance name
+ * we have a ZK quorum and instance name
* @param configuration
*/
public void configure(String configuration);
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
index d1df97e..164512a 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/replication/ReplicaSystemFactory.java
@@ -27,6 +27,8 @@ import com.google.common.base.Preconditions;
public class ReplicaSystemFactory {
private static final Logger log = LoggerFactory.getLogger(ReplicaSystemFactory.class);
+ private ReplicaSystemFactory() {}
+
/**
* @param value
* {@link ReplicaSystem} implementation class name
@@ -53,10 +55,10 @@ public class ReplicaSystemFactory {
return rs;
}
- throw new RuntimeException("Class is not assignable to ReplicaSystem: " + name);
+ throw new IllegalArgumentException("Class is not assignable to ReplicaSystem: " + name);
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
log.error("Error creating ReplicaSystem object", e);
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
index 619e522..a134ec8 100644
--- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
+++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java
@@ -784,6 +784,9 @@ public class Mutation implements Writable {
* @return An unmodifiable view of the replication sources
*/
public Set<String> getReplicationSources() {
+ if (null == replicationSources) {
+ return EMPTY;
+ }
return Collections.unmodifiableSet(replicationSources);
}
@@ -926,9 +929,13 @@ public class Mutation implements Writable {
}
}
if (0x02 == (0x02 & hasValues)) {
- WritableUtils.writeVInt(out, replicationSources.size());
- for (String source : replicationSources) {
- WritableUtils.writeString(out, source);
+ if (null == replicationSources) {
+ WritableUtils.writeVInt(out, 0);
+ } else {
+ WritableUtils.writeVInt(out, replicationSources.size());
+ for (String source : replicationSources) {
+ WritableUtils.writeString(out, source);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
index b340009..e98bc1d 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationDriver.java
@@ -27,13 +27,14 @@ import org.apache.accumulo.master.Master;
import org.apache.accumulo.trace.instrument.CountSampler;
import org.apache.accumulo.trace.instrument.Sampler;
import org.apache.accumulo.trace.instrument.Trace;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Daemon wrapper around the {@link WorkMaker} that separates it from the Master
*/
public class ReplicationDriver extends Daemon {
- private static final Logger log = Logger.getLogger(ReplicationDriver.class);
+ private static final Logger log = LoggerFactory.getLogger(ReplicationDriver.class);
private final Master master;
private final AccumuloConfiguration conf;
@@ -95,7 +96,13 @@ public class ReplicationDriver extends Daemon {
Trace.offNoFlush();
// Sleep for a bit
- UtilWaitThread.sleep(conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL));
+ long sleepMillis = conf.getTimeInMillis(Property.MASTER_REPLICATION_SCAN_INTERVAL);
+ log.debug("Sleeping for {}ms before re-running", sleepMillis);
+ try {
+ Thread.sleep(sleepMillis);
+ } catch (InterruptedException e) {
+ log.error("Interrupted while sleeping", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
index a03cfab..a75113b 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/CyclicReplicationIT.java
@@ -121,6 +121,7 @@ public class CyclicReplicationIT {
String master1UserName = "master1", master1Password = "foo";
String master2UserName = "master2", master2Password = "bar";
+ String master1Table = master1Cluster.getInstanceName(), master2Table = master2Cluster.getInstanceName();
connMaster1.securityOperations().createLocalUser(master1UserName, new PasswordToken(master1Password));
connMaster2.securityOperations().createLocalUser(master2UserName, new PasswordToken(master2Password));
@@ -142,27 +143,27 @@ public class CyclicReplicationIT {
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
AccumuloReplicaSystem.buildConfiguration(master1Cluster.getInstanceName(), master1Cluster.getZooKeepers())));
- connMaster1.tableOperations().create(master1Cluster.getInstanceName(), false);
- String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Cluster.getInstanceName());
+ connMaster1.tableOperations().create(master1Table, false);
+ String master1TableId = connMaster1.tableOperations().tableIdMap().get(master1Table);
Assert.assertNotNull(master1TableId);
- connMaster2.tableOperations().create(master2Cluster.getInstanceName(), false);
- String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Cluster.getInstanceName());
+ connMaster2.tableOperations().create(master2Table, false);
+ String master2TableId = connMaster2.tableOperations().tableIdMap().get(master2Table);
Assert.assertNotNull(master2TableId);
// Replicate master1 in the master1 cluster to master2 in the master2 cluster
- connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true");
- connMaster1.tableOperations().setProperty(master1Cluster.getInstanceName(),
+ connMaster1.tableOperations().setProperty(master1Table, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster1.tableOperations().setProperty(master1Table,
Property.TABLE_REPLICATION_TARGETS.getKey() + master2Cluster.getInstanceName(), master2TableId);
// Replicate master2 in the master2 cluster to master1 in the master2 cluster
- connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(), Property.TABLE_REPLICATION.getKey(), "true");
- connMaster2.tableOperations().setProperty(master2Cluster.getInstanceName(),
+ connMaster2.tableOperations().setProperty(master2Table, Property.TABLE_REPLICATION.getKey(), "true");
+ connMaster2.tableOperations().setProperty(master2Table,
Property.TABLE_REPLICATION_TARGETS.getKey() + master1Cluster.getInstanceName(), master1TableId);
// Give our replication user the ability to write to the respective table
- connMaster1.securityOperations().grantTablePermission(master1UserName, master1Cluster.getInstanceName(), TablePermission.WRITE);
- connMaster2.securityOperations().grantTablePermission(master2UserName, master2Cluster.getInstanceName(), TablePermission.WRITE);
+ connMaster1.securityOperations().grantTablePermission(master1UserName, master1Table, TablePermission.WRITE);
+ connMaster2.securityOperations().grantTablePermission(master2UserName, master2Table, TablePermission.WRITE);
IteratorSetting summingCombiner = new IteratorSetting(50, SummingCombiner.class);
SummingCombiner.setEncodingType(summingCombiner, Type.STRING);
@@ -170,17 +171,17 @@ public class CyclicReplicationIT {
// Set a combiner on both instances that will sum multiple values
// We can use this to verify that the mutation was not sent multiple times
- connMaster1.tableOperations().attachIterator(master1Cluster.getInstanceName(), summingCombiner);
- connMaster2.tableOperations().attachIterator(master2Cluster.getInstanceName(), summingCombiner);
+ connMaster1.tableOperations().attachIterator(master1Table, summingCombiner);
+ connMaster2.tableOperations().attachIterator(master2Table, summingCombiner);
// Write a single entry
- BatchWriter bw = connMaster1.createBatchWriter(master1Cluster.getInstanceName(), new BatchWriterConfig());
+ BatchWriter bw = connMaster1.createBatchWriter(master1Table, new BatchWriterConfig());
Mutation m = new Mutation("row");
m.put("count", "", "1");
bw.addMutation(m);
bw.close();
- Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Cluster.getInstanceName());
+ Set<String> files = connMaster1.replicationOperations().referencedFiles(master1Table);
log.info("Found {} that need replication from master1", files);
@@ -194,22 +195,22 @@ public class CyclicReplicationIT {
log.info("Restarted tserver on master1");
// Sanity check that the element is there on master1
- Scanner s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
+ Scanner s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
Entry<Key,Value> entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
// Wait for this table to replicate
- connMaster1.replicationOperations().drain(master1Cluster.getInstanceName(), files);
+ connMaster1.replicationOperations().drain(master1Table, files);
Thread.sleep(5000);
// Check that the element made it to master2 only once
- s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
+ s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
// Wait for master2 to finish replicating it back
- files = connMaster2.replicationOperations().referencedFiles(master2Cluster.getInstanceName());
+ files = connMaster2.replicationOperations().referencedFiles(master2Table);
// Kill and restart the tserver to close the WAL on master2
for (ProcessReference proc : master2Cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
@@ -219,16 +220,16 @@ public class CyclicReplicationIT {
master2Cluster.exec(TabletServer.class);
// Check that the element made it to master2 only once
- s = connMaster2.createScanner(master2Cluster.getInstanceName(), Authorizations.EMPTY);
+ s = connMaster2.createScanner(master2Table, Authorizations.EMPTY);
entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
- connMaster2.replicationOperations().drain(master2Cluster.getInstanceName(), files);
+ connMaster2.replicationOperations().drain(master2Table, files);
Thread.sleep(5000);
// Verify that the entry wasn't sent back to master1
- s = connMaster1.createScanner(master1Cluster.getInstanceName(), Authorizations.EMPTY);
+ s = connMaster1.createScanner(master1Table, Authorizations.EMPTY);
entry = Iterables.getOnlyElement(s);
Assert.assertEquals("1", entry.getValue().toString());
} finally {
http://git-wip-us.apache.org/repos/asf/accumulo/blob/84e94a42/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
----------------------------------------------------------------------
diff --git a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
index 6c21962..d561d2f 100644
--- a/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/replication/UnorderedWorkAssignerReplicationIT.java
@@ -113,40 +113,40 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
try {
final Connector connMaster = getConnector();
final Connector connPeer = peerCluster.getConnector("root", ROOT_PASSWORD);
-
+
ReplicationTable.create(connMaster);
String peerUserName = "peer", peerPassword = "foo";
-
+
String peerClusterName = "peer";
connPeer.securityOperations().createLocalUser(peerUserName, new PasswordToken(peerPassword));
-
+
connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + peerClusterName, peerUserName);
connMaster.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + peerClusterName, peerPassword);
-
+
// ...peer = AccumuloReplicaSystem,instanceName,zookeepers
connMaster.instanceOperations().setProperty(
Property.REPLICATION_PEERS.getKey() + peerClusterName,
ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class,
AccumuloReplicaSystem.buildConfiguration(peerCluster.getInstanceName(), peerCluster.getZooKeepers())));
-
+
final String masterTable = "master", peerTable = "peer";
-
+
connMaster.tableOperations().create(masterTable);
String masterTableId = connMaster.tableOperations().tableIdMap().get(masterTable);
Assert.assertNotNull(masterTableId);
-
+
connPeer.tableOperations().create(peerTable);
String peerTableId = connPeer.tableOperations().tableIdMap().get(peerTable);
Assert.assertNotNull(peerTableId);
connPeer.securityOperations().grantTablePermission(peerUserName, peerTable, TablePermission.WRITE);
-
+
// Replicate this table to the peerClusterName in a table with the peerTableId table id
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION.getKey(), "true");
connMaster.tableOperations().setProperty(masterTable, Property.TABLE_REPLICATION_TARGETS.getKey() + peerClusterName, peerTableId);
-
+
// Write some data to table1
BatchWriter bw = connMaster.createBatchWriter(masterTable, new BatchWriterConfig());
for (int rows = 0; rows < 5000; rows++) {
@@ -157,23 +157,23 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
}
bw.addMutation(m);
}
-
+
bw.close();
-
+
log.info("Wrote all data to master cluster");
-
+
final Set<String> filesNeedingReplication = connMaster.replicationOperations().referencedFiles(masterTable);
-
+
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
}
cluster.exec(TabletServer.class);
-
+
log.info("TabletServer restarted");
for (@SuppressWarnings("unused")
Entry<Key,Value> e : ReplicationTable.getScanner(connMaster)) {}
log.info("TabletServer is online");
-
+
log.info("");
log.info("Fetching metadata records:");
for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
@@ -183,33 +183,33 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
}
}
-
+
log.info("");
log.info("Fetching replication records:");
for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
}
-
+
Future<Boolean> future = executor.submit(new Callable<Boolean>() {
-
+
@Override
public Boolean call() throws Exception {
connMaster.replicationOperations().drain(masterTable, filesNeedingReplication);
log.info("Drain completed");
return true;
}
-
+
});
-
+
try {
future.get(30, TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
Assert.fail("Drain did not finish within 30 seconds");
}
-
+
log.info("drain completed");
-
+
log.info("");
log.info("Fetching metadata records:");
for (Entry<Key,Value> kv : connMaster.createScanner(MetadataTable.NAME, Authorizations.EMPTY)) {
@@ -219,13 +219,13 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
log.info(kv.getKey().toStringNoTruncate() + " " + kv.getValue());
}
}
-
+
log.info("");
log.info("Fetching replication records:");
for (Entry<Key,Value> kv : connMaster.createScanner(ReplicationTable.NAME, Authorizations.EMPTY)) {
log.info(kv.getKey().toStringNoTruncate() + " " + ProtobufUtil.toString(Status.parseFrom(kv.getValue().get())));
}
-
+
Scanner master = connMaster.createScanner(masterTable, Authorizations.EMPTY), peer = connPeer.createScanner(peerTable, Authorizations.EMPTY);
Iterator<Entry<Key,Value>> masterIter = master.iterator(), peerIter = peer.iterator();
Entry<Key,Value> masterEntry = null, peerEntry = null;
@@ -236,10 +236,10 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
masterEntry.getKey().compareTo(peerEntry.getKey(), PartialKey.ROW_COLFAM_COLQUAL_COLVIS));
Assert.assertEquals(masterEntry.getValue(), peerEntry.getValue());
}
-
+
log.info("Last master entry: " + masterEntry);
log.info("Last peer entry: " + peerEntry);
-
+
Assert.assertFalse("Had more data to read from the master", masterIter.hasNext());
Assert.assertFalse("Had more data to read from the peer", peerIter.hasNext());
} finally {
@@ -377,7 +377,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
.startsWith(masterTable1));
}
-
+
log.info("Found {} records in {}", countTable, peerTable1);
if (masterTable1Records != countTable) {
@@ -394,7 +394,7 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
Assert.assertTrue("Found unexpected key-value" + entry.getKey().toStringNoTruncate() + " " + entry.getValue(), entry.getKey().getRow().toString()
.startsWith(masterTable2));
}
-
+
log.info("Found {} records in {}", countTable, peerTable2);
if (masterTable2Records != countTable) {
@@ -605,7 +605,6 @@ public class UnorderedWorkAssignerReplicationIT extends ConfigurableMacIT {
Thread.sleep(500);
}
-
for (ProcessReference proc : cluster.getProcesses().get(ServerType.TABLET_SERVER)) {
cluster.killProcess(ServerType.TABLET_SERVER, proc);
}