You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:28:46 UTC
[08/50] [abbrv] git commit: ACCUMULO-2581 Initial impl to use
DistributedWorkQueue to assign replication work.
ACCUMULO-2581 Initial impl to use DistributedWorkQueue to assign replication work.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31f4e83b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31f4e83b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31f4e83b
Branch: refs/heads/ACCUMULO-378
Commit: 31f4e83b94c4f150df2639f251d6835305fa531d
Parents: f584685
Author: Josh Elser <el...@apache.org>
Authored: Thu May 1 16:58:47 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 1 16:58:47 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/core/Constants.java | 2 +
.../org/apache/accumulo/core/conf/Property.java | 6 +-
.../accumulo/core/replication/StatusUtil.java | 24 ++
server/master/pom.xml | 5 +
.../replication/ReplicationWorkAssigner.java | 264 +++++++++++++++++++
.../accumulo/master/replication/Work.java | 98 +++++++
.../ReplicationWorkAssignerTest.java | 217 +++++++++++++++
.../accumulo/master/replication/WorkTest.java | 49 ++++
8 files changed, 664 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 7d602bb..c87119c 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -113,4 +113,6 @@ public class Constants {
public static final String[] PATH_PROPERTY_ENV_VARS = new String[] {"ACCUMULO_HOME", "ACCUMULO_CONF_DIR"};
public static final String HDFS_TABLES_DIR = "/tables";
+
+ public static final String ZREPLICATION = "/replication";
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2ad1369..637600d 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -454,7 +454,11 @@ public enum Property {
REPLICATION_BATCH_SIZE("replication.batch.size", "1000", PropertyType.COUNT, "Maximum number of updates (WAL) or key-value pairs (RFile) to send in one replication task"),
@Experimental
REPLICATION_SEND_THREAD_POOL_SIZE("replication.send.threads", "1", PropertyType.COUNT, "Size of threadpool used to start replication to slaves"),
-
+ @Experimental
+ REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
+ @Experimental
+ REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
+
;
private String key, defaultValue, description;
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index 3b8ccb6..74327fd 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -21,6 +21,8 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.proto.Replication.Status;
import org.apache.accumulo.core.replication.proto.Replication.Status.Builder;
+import com.google.protobuf.InvalidProtocolBufferException;
+
/**
* Helper methods to create Status protobuf messages
*/
@@ -161,6 +163,15 @@ public class StatusUtil {
}
/**
+ * @param v Value with serialized Status
+ * @return A Status created from the Value
+ * @throws InvalidProtocolBufferException
+ */
+ public static Status fromValue(Value v) throws InvalidProtocolBufferException {
+ return Status.parseFrom(v.get());
+ }
+
+ /**
* Is the given Status fully replicated and is its file ready for deletion on the source
* @param status a Status protobuf
* @return True if the file this Status references can be deleted.
@@ -181,4 +192,17 @@ public class StatusUtil {
return status.getBegin() >= status.getEnd();
}
}
+
+ /**
+ * Given the {@link Status}, is there replication work to be done
+ * @param status Status for a file
+ * @return true if replication work is required
+ */
+ public static boolean isWorkRequired(Status status) {
+ if (status.getInfiniteEnd()) {
+ return Long.MAX_VALUE == status.getBegin();
+ } else {
+ return status.getBegin() < status.getEnd();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/pom.xml
----------------------------------------------------------------------
diff --git a/server/master/pom.xml b/server/master/pom.xml
index f9b2990..3b9684c 100644
--- a/server/master/pom.xml
+++ b/server/master/pom.xml
@@ -84,5 +84,10 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
new file mode 100644
index 0000000..f3a8825
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Read work records from the replication table, create work entries for other nodes to complete.
+ * <p>
+ * Uses the DistributedWorkQueue to make the work available for any tserver. This approach does not consider the locality of the tabletserver performing the
+ * work in relation to the data being replicated (local HDFS blocks).
+ */
+public class ReplicationWorkAssigner implements Runnable {
+ private static final Logger log = LoggerFactory.getLogger(ReplicationWorkAssigner.class);
+
+ private Master master;
+ private Connector conn;
+
+ private AccumuloConfiguration conf;
+ private DistributedWorkQueue workQueue;
+ private Set<String> queuedWork;
+ private int maxQueueSize;
+ private ZooCache zooCache;
+
+ public ReplicationWorkAssigner(Master master, Connector conn) {
+ this.master = master;
+ this.conn = conn;
+ }
+
+ /*
+ * Getters/setters for testing purposes
+ */
+ protected Connector getConnector() {
+ return conn;
+ }
+
+ protected void setConnector(Connector conn) {
+ this.conn = conn;
+ }
+
+ protected AccumuloConfiguration getConf() {
+ return conf;
+ }
+
+ protected void setConf(AccumuloConfiguration conf) {
+ this.conf = conf;
+ }
+
+ protected DistributedWorkQueue getWorkQueue() {
+ return workQueue;
+ }
+
+ protected void setWorkQueue(DistributedWorkQueue workQueue) {
+ this.workQueue = workQueue;
+ }
+
+ protected Set<String> getQueuedWork() {
+ return queuedWork;
+ }
+
+ protected void setQueuedWork(Set<String> queuedWork) {
+ this.queuedWork = queuedWork;
+ }
+
+ protected int getMaxQueueSize() {
+ return maxQueueSize;
+ }
+
+ protected void setMaxQueueSize(int maxQueueSize) {
+ this.maxQueueSize = maxQueueSize;
+ }
+
+ protected ZooCache getZooCache() {
+ return zooCache;
+ }
+
+ protected void setZooCache(ZooCache zooCache) {
+ this.zooCache = zooCache;
+ }
+
+ /**
+ * Initialize the DistributedWorkQueue using the proper ZK location
+ *
+ * @param conf
+ */
+ protected void initializeWorkQueue(AccumuloConfiguration conf) {
+ workQueue = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION, conf);
+ }
+
+ /**
+ * Initialize the queuedWork set with the work already sent out
+ */
+ protected void initializeQueuedWork() {
+ Preconditions.checkArgument(null == queuedWork, "Expected queuedWork to be null");
+ queuedWork = new HashSet<>();
+ try {
+ queuedWork.addAll(workQueue.getWorkQueued());
+ } catch (KeeperException | InterruptedException e) {
+ throw new RuntimeException("Error reading existing queued replication work", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ while (master.stillMaster()) {
+ if (null == conf) {
+ conf = master.getConfiguration().getConfiguration();
+ }
+
+ // Get the maximum number of entries we want to queue work for (or the default)
+ maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
+
+ if (null == workQueue) {
+ initializeWorkQueue(conf);
+ }
+
+ if (null == queuedWork) {
+ initializeQueuedWork();
+ }
+
+ if (null == zooCache) {
+ zooCache = new ZooCache();
+ }
+
+ // Scan over the work records, adding the work to the queue
+ createWork();
+
+ // Keep the state of the work we queued correct
+ cleanupFinishedWork();
+ }
+ }
+
+ /**
+ * Scan over the {@link WorkSection} of the replication table adding work for entries that
+ * have data to replicate and have not already been queued.
+ */
+ protected void createWork() {
+ // Create a batchscanner over the replication table's work entries
+ BatchScanner bs;
+ try {
+ bs = ReplicationTable.getBatchScanner(conn, 4);
+ } catch (TableNotFoundException e) {
+ log.warn("Could not find replication table", e);
+ return;
+ }
+
+ WorkSection.limit(bs);
+ bs.setRanges(Collections.singleton(new Range()));
+ Text buffer = new Text();
+ try {
+ for (Entry<Key,Value> entry : bs) {
+ // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
+ // to add more work entries
+ if (queuedWork.size() > maxQueueSize) {
+ log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
+ return;
+ }
+
+ WorkSection.getFile(entry.getKey(), buffer);
+ String file = buffer.toString();
+ Status status;
+ try {
+ status = StatusUtil.fromValue(entry.getValue());
+ } catch (InvalidProtocolBufferException e) {
+ log.warn("Could not deserialize protobuf from work entry for {}", file, e);
+ continue;
+ }
+
+ // If there is work to do
+ if (StatusUtil.isWorkRequired(status)) {
+ Path p = new Path(file);
+ String filename = p.getName();
+
+ // And, we haven't already queued this file up for work already
+ if (!queuedWork.contains(filename)) {
+ entry.getKey().getColumnQualifier(buffer);
+ queueWork(file, filename, buffer);
+ }
+ }
+ }
+ } finally {
+ if (null != bs) {
+ bs.close();
+ }
+ }
+ }
+
+ /**
+ * Distribute the work for the given path with filename
+ *
+ * @param path
+ * Full path to a file
+ * @param filename
+ * Filename
+ */
+ protected void queueWork(String path, String filename, Text serializedTarget) {
+ try {
+ String key = filename + "|" + serializedTarget;
+ workQueue.addWork(key, path.getBytes(StandardCharsets.UTF_8));
+ queuedWork.add(key);
+ } catch (KeeperException | InterruptedException e) {
+ log.warn("Could not queue work for {}", path, e);
+ }
+ }
+
+ /**
+ * Iterate over the queued work to remove entries that have been completed.
+ */
+ protected void cleanupFinishedWork() {
+ Iterator<String> work = queuedWork.iterator();
+ while (work.hasNext()) {
+ String filename = work.next();
+ // Null equates to the work was finished
+ if (null == zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION + "/" + filename)) {
+ work.remove();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
new file mode 100644
index 0000000..cb45b44
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Encapsulates a file (path) and {@link Status}
+ */
+public class Work implements Writable {
+
+ private String file;
+
+ private Status status;
+
+ public Work() { }
+
+ public Work(String file, Status status) {
+ this.file = file;
+ this.status = status;
+ }
+
+ public String getFile() {
+ return file;
+ }
+
+ public void setFile(String file) {
+ this.file = file;
+ }
+
+ public Status getStatus() {
+ return status;
+ }
+
+ public void setStatus(Status status) {
+ this.status = status;
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ WritableUtils.writeString(out, file);
+ byte[] bytes = status.toByteArray();
+ WritableUtils.writeVInt(out, bytes.length);
+ out.write(bytes);
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ file = WritableUtils.readString(in);
+ int len = WritableUtils.readVInt(in);
+ byte[] bytes = new byte[len];
+ in.readFully(bytes);
+ status = Status.parseFrom(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof Work) {
+ Work other = (Work) o;
+ return file.equals(other.getFile()) && status.equals(other.getStatus());
+ }
+
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return file + " " + TextFormat.shortDebugString(status);
+ }
+
+ @Override
+ public int hashCode() {
+ return file.hashCode() ^ status.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
new file mode 100644
index 0000000..b596126
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ *
+ */
+public class ReplicationWorkAssignerTest {
+
+ @Rule
+ public TestName test = new TestName();
+
+ private Master master;
+ private Connector conn;
+ private ReplicationWorkAssigner assigner;
+
+ @Before
+ public void init() {
+ master = createMock(Master.class);
+ conn = createMock(Connector.class);
+ assigner = new ReplicationWorkAssigner(master, conn);
+ }
+
+ @Test
+ public void workQueuedUsingFileName() throws Exception {
+ ReplicationTarget target = new ReplicationTarget("cluster1", "table1");
+ Text serializedTarget = ReplicationTarget.toText(target);
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ Set<String> queuedWork = new HashSet<>();
+ assigner.setQueuedWork(queuedWork);
+ assigner.setWorkQueue(workQueue);
+
+ Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID());
+ assigner.queueWork(p.toString(), p.getName(), serializedTarget);
+
+ workQueue.addWork(p.getName(), p.toString().getBytes(StandardCharsets.UTF_8));
+ expectLastCall().once();
+
+ replay(workQueue);
+
+ Assert.assertEquals(1, queuedWork.size());
+ Assert.assertEquals(p.getName() + "|" + serializedTarget, queuedWork.iterator().next());
+ }
+
+ @Test
+ public void existingWorkIsReQueued() throws Exception {
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+ List<String> existingWork = Arrays.asList("/accumulo/wal/tserver+port/wal1", "/accumulo/wal/tserver+port/wal2");
+ expect(workQueue.getWorkQueued()).andReturn(existingWork);
+
+ replay(workQueue);
+
+ assigner.setWorkQueue(workQueue);
+ assigner.initializeQueuedWork();
+
+ Set<String> queuedWork = assigner.getQueuedWork();
+ Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size());
+ Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork));
+ }
+
+ @Test
+ public void createWorkForFilesNeedingIt() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2");
+ Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2);
+
+ MockInstance inst = new MockInstance(test.getMethodName());
+ Credentials creds = new Credentials("root", new PasswordToken(""));
+ Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+ // Set the connector
+ assigner.setConnector(conn);
+
+ // Create and grant ourselves write to the replication table
+ ReplicationTable.create(conn);
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, StatusUtil.openWithUnknownLengthValue());
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, StatusUtil.openWithUnknownLengthValue());
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ @SuppressWarnings("unchecked")
+ HashSet<String> queuedWork = createMock(HashSet.class);
+ assigner.setQueuedWork(queuedWork);
+
+ expect(queuedWork.size()).andReturn(0).anyTimes();
+
+ // Make sure we expect the invocations in the correct order (accumulo is sorted)
+ if (file1.compareTo(file2) <= 0) {
+ String key = filename1 + "|" + serializedTarget1;
+ workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+ expectLastCall().once();
+ expect(queuedWork.add(key)).andReturn(true).once();
+
+ key = filename2 + "|" + serializedTarget2;
+ workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+ expectLastCall().once();
+ expect(queuedWork.add(key)).andReturn(true).once();
+ } else {
+ String key = filename2 + "|" + serializedTarget2;
+ workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+ expectLastCall().once();
+ expect(queuedWork.add(key)).andReturn(true).once();
+
+ key = filename1 + "|" + serializedTarget1;
+ workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+ expectLastCall().once();
+ expect(queuedWork.add(key)).andReturn(true).once();
+ }
+
+ replay(queuedWork, workQueue);
+
+ assigner.createWork();
+ }
+
+ @Test
+ public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
+ ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2");
+ Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2);
+
+ MockInstance inst = new MockInstance(test.getMethodName());
+ Credentials creds = new Credentials("root", new PasswordToken(""));
+ Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+ // Set the connector
+ assigner.setConnector(conn);
+
+ // Create and grant ourselves write to the replication table
+ ReplicationTable.create(conn);
+ conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+ // Create two mutations, both of which need replication work done
+ BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+ String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+ String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+ Mutation m = new Mutation(file1);
+ WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue());
+ bw.addMutation(m);
+
+ m = new Mutation(file2);
+ WorkSection.add(m, serializedTarget2, StatusUtil.newFileValue());
+ bw.addMutation(m);
+
+ bw.close();
+
+ DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+ @SuppressWarnings("unchecked")
+ HashSet<String> queuedWork = createMock(HashSet.class);
+ assigner.setQueuedWork(queuedWork);
+
+ expect(queuedWork.size()).andReturn(0).anyTimes();
+
+ replay(queuedWork, workQueue);
+
+ assigner.createWork();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
new file mode 100644
index 0000000..09530a7
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ *
+ */
+public class WorkTest {
+
+ @Test
+ public void serialization() throws IOException {
+ Work w = new Work("/foo/bar", StatusUtil.openWithUnknownLength());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ w.write(new DataOutputStream(baos));
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+
+ Work newW = new Work();
+ newW.readFields(new DataInputStream(bais));
+
+ Assert.assertEquals(w, newW);
+ }
+
+}