You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:28:46 UTC

[08/50] [abbrv] git commit: ACCUMULO-2581 Initial impl to use DistributedWorkQueue to assign replication work.

ACCUMULO-2581 Initial impl to use DistributedWorkQueue to assign replication work.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/31f4e83b
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/31f4e83b
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/31f4e83b

Branch: refs/heads/ACCUMULO-378
Commit: 31f4e83b94c4f150df2639f251d6835305fa531d
Parents: f584685
Author: Josh Elser <el...@apache.org>
Authored: Thu May 1 16:58:47 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 1 16:58:47 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/core/Constants.java     |   2 +
 .../org/apache/accumulo/core/conf/Property.java |   6 +-
 .../accumulo/core/replication/StatusUtil.java   |  24 ++
 server/master/pom.xml                           |   5 +
 .../replication/ReplicationWorkAssigner.java    | 264 +++++++++++++++++++
 .../accumulo/master/replication/Work.java       |  98 +++++++
 .../ReplicationWorkAssignerTest.java            | 217 +++++++++++++++
 .../accumulo/master/replication/WorkTest.java   |  49 ++++
 8 files changed, 664 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/Constants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/Constants.java b/core/src/main/java/org/apache/accumulo/core/Constants.java
index 7d602bb..c87119c 100644
--- a/core/src/main/java/org/apache/accumulo/core/Constants.java
+++ b/core/src/main/java/org/apache/accumulo/core/Constants.java
@@ -113,4 +113,6 @@ public class Constants {
   public static final String[] PATH_PROPERTY_ENV_VARS = new String[] {"ACCUMULO_HOME", "ACCUMULO_CONF_DIR"};
 
   public static final String HDFS_TABLES_DIR = "/tables";
+
+  public static final String ZREPLICATION = "/replication";
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 2ad1369..637600d 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -454,7 +454,11 @@ public enum Property {
   REPLICATION_BATCH_SIZE("replication.batch.size", "1000", PropertyType.COUNT, "Maximum number of updates (WAL) or key-value pairs (RFile) to send in one replication task"),
   @Experimental
   REPLICATION_SEND_THREAD_POOL_SIZE("replication.send.threads", "1", PropertyType.COUNT, "Size of threadpool used to start replication to slaves"),
-  
+  @Experimental
+  REPLICATION_MAX_WORK_QUEUE("replication.max.work.queue", "20000000", PropertyType.COUNT, "Upper bound of the number of files queued for replication"),
+  @Experimental
+  REPLICATION_WORK_ASSIGNMENT_SLEEP("replication.work.assignment.sleep", "30s", PropertyType.TIMEDURATION, "Amount of time to sleep between replication work assignment"),
+
   ;
 
   private String key, defaultValue, description;

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
index 3b8ccb6..74327fd 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/StatusUtil.java
@@ -21,6 +21,8 @@ import org.apache.accumulo.core.protobuf.ProtobufUtil;
 import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.core.replication.proto.Replication.Status.Builder;
 
+import com.google.protobuf.InvalidProtocolBufferException;
+
 /**
  * Helper methods to create Status protobuf messages
  */
@@ -161,6 +163,15 @@ public class StatusUtil {
   }
 
   /**
+   * @param v Value with serialized Status
+   * @return A Status created from the Value
+   * @throws InvalidProtocolBufferException
+   */
+  public static Status fromValue(Value v) throws InvalidProtocolBufferException {
+    return Status.parseFrom(v.get());
+  }
+
+  /**
    * Is the given Status fully replicated and is its file ready for deletion on the source
    * @param status a Status protobuf
    * @return True if the file this Status references can be deleted.
@@ -181,4 +192,17 @@ public class StatusUtil {
       return status.getBegin() >= status.getEnd();
     }
   }
+
+  /**
+   * Given the {@link Status}, is there replication work to be done
+   * @param status Status for a file
+   * @return true if replication work is required
+   */
+  public static boolean isWorkRequired(Status status) {
+    if (status.getInfiniteEnd()) {
+      return Long.MAX_VALUE == status.getBegin();
+    } else {
+      return status.getBegin() < status.getEnd();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/pom.xml
----------------------------------------------------------------------
diff --git a/server/master/pom.xml b/server/master/pom.xml
index f9b2990..3b9684c 100644
--- a/server/master/pom.xml
+++ b/server/master/pom.xml
@@ -84,5 +84,10 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.easymock</groupId>
+      <artifactId>easymock</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
new file mode 100644
index 0000000..f3a8825
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/ReplicationWorkAssigner.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.BatchScanner;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.accumulo.server.zookeeper.ZooCache;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * Read work records from the replication table, create work entries for other nodes to complete.
+ * <p>
+ * Uses the DistributedWorkQueue to make the work available for any tserver. This approach does not consider the locality of the tabletserver performing the
+ * work in relation to the data being replicated (local HDFS blocks).
+ */
+public class ReplicationWorkAssigner implements Runnable {
+  private static final Logger log = LoggerFactory.getLogger(ReplicationWorkAssigner.class);
+
+  private Master master;
+  private Connector conn;
+
+  private AccumuloConfiguration conf;
+  private DistributedWorkQueue workQueue;
+  private Set<String> queuedWork;
+  private int maxQueueSize;
+  private ZooCache zooCache;
+
+  public ReplicationWorkAssigner(Master master, Connector conn) {
+    this.master = master;
+    this.conn = conn;
+  }
+
+  /*
+   * Getters/setters for testing purposes
+   */
+  protected Connector getConnector() {
+    return conn;
+  }
+
+  protected void setConnector(Connector conn) {
+    this.conn = conn;
+  }
+
+  protected AccumuloConfiguration getConf() {
+    return conf;
+  }
+
+  protected void setConf(AccumuloConfiguration conf) {
+    this.conf = conf;
+  }
+
+  protected DistributedWorkQueue getWorkQueue() {
+    return workQueue;
+  }
+
+  protected void setWorkQueue(DistributedWorkQueue workQueue) {
+    this.workQueue = workQueue;
+  }
+
+  protected Set<String> getQueuedWork() {
+    return queuedWork;
+  }
+
+  protected void setQueuedWork(Set<String> queuedWork) {
+    this.queuedWork = queuedWork;
+  }
+
+  protected int getMaxQueueSize() {
+    return maxQueueSize;
+  }
+
+  protected void setMaxQueueSize(int maxQueueSize) {
+    this.maxQueueSize = maxQueueSize;
+  }
+
+  protected ZooCache getZooCache() {
+    return zooCache;
+  }
+
+  protected void setZooCache(ZooCache zooCache) {
+    this.zooCache = zooCache;
+  }
+
+  /**
+   * Initialize the DistributedWorkQueue using the proper ZK location
+   * 
+   * @param conf
+   */
+  protected void initializeWorkQueue(AccumuloConfiguration conf) {
+    workQueue = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION, conf);
+  }
+
+  /**
+   * Initialize the queuedWork set with the work already sent out
+   */
+  protected void initializeQueuedWork() {
+    Preconditions.checkArgument(null == queuedWork, "Expected queuedWork to be null");
+    queuedWork = new HashSet<>();
+    try {
+      queuedWork.addAll(workQueue.getWorkQueued());
+    } catch (KeeperException | InterruptedException e) {
+      throw new RuntimeException("Error reading existing queued replication work", e);
+    }
+  }
+
+  @Override
+  public void run() {
+    while (master.stillMaster()) {
+      if (null == conf) {
+        conf = master.getConfiguration().getConfiguration();
+      }
+
+      // Get the maximum number of entries we want to queue work for (or the default)
+      maxQueueSize = conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
+
+      if (null == workQueue) {
+        initializeWorkQueue(conf);
+      }
+
+      if (null == queuedWork) {
+        initializeQueuedWork();
+      }
+
+      if (null == zooCache) {
+        zooCache = new ZooCache();
+      }
+
+      // Scan over the work records, adding the work to the queue
+      createWork();
+
+      // Keep the state of the work we queued correct
+      cleanupFinishedWork();
+    }
+  }
+
+  /**
+   * Scan over the {@link WorkSection} of the replication table adding work for entries that
+   * have data to replicate and have not already been queued.
+   */
+  protected void createWork() {
+    // Create a batchscanner over the replication table's work entries
+    BatchScanner bs;
+    try {
+      bs = ReplicationTable.getBatchScanner(conn, 4);
+    } catch (TableNotFoundException e) {
+      log.warn("Could not find replication table", e);
+      return;
+    }
+
+    WorkSection.limit(bs);
+    bs.setRanges(Collections.singleton(new Range()));
+    Text buffer = new Text();
+    try {
+      for (Entry<Key,Value> entry : bs) {
+        // If we're not working off the entries, we need to not shoot ourselves in the foot by continuing
+        // to add more work entries
+        if (queuedWork.size() > maxQueueSize) {
+          log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", maxQueueSize);
+          return;
+        }
+
+        WorkSection.getFile(entry.getKey(), buffer);
+        String file = buffer.toString();
+        Status status;
+        try {
+          status = StatusUtil.fromValue(entry.getValue());
+        } catch (InvalidProtocolBufferException e) {
+          log.warn("Could not deserialize protobuf from work entry for {}", file, e);
+          continue;
+        }
+
+        // If there is work to do
+        if (StatusUtil.isWorkRequired(status)) {
+          Path p = new Path(file);
+          String filename = p.getName();
+
+          // And, we haven't already queued this file up for work already
+          if (!queuedWork.contains(filename)) {
+            entry.getKey().getColumnQualifier(buffer);
+            queueWork(file, filename, buffer);
+          }
+        }
+      }
+    } finally {
+      if (null != bs) {
+        bs.close();
+      }
+    }
+  }
+
+  /**
+   * Distribute the work for the given path with filename
+   * 
+   * @param path
+   *          Full path to a file
+   * @param filename
+   *          Filename
+   */
+  protected void queueWork(String path, String filename, Text serializedTarget) {
+    try {
+      String key = filename + "|" + serializedTarget;
+      workQueue.addWork(key, path.getBytes(StandardCharsets.UTF_8));
+      queuedWork.add(key);
+    } catch (KeeperException | InterruptedException e) {
+      log.warn("Could not queue work for {}", path, e);
+    }
+  }
+
+  /**
+   * Iterate over the queued work to remove entries that have been completed.
+   */
+  protected void cleanupFinishedWork() {
+    Iterator<String> work = queuedWork.iterator();
+    while (work.hasNext()) {
+      String filename = work.next();
+      // Null equates to the work was finished
+      if (null == zooCache.get(ZooUtil.getRoot(master.getInstance()) + Constants.ZREPLICATION + "/" + filename)) {
+        work.remove();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
new file mode 100644
index 0000000..cb45b44
--- /dev/null
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/Work.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.accumulo.core.replication.proto.Replication.Status;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Encapsulates a file (path) and {@link Status}
+ */
+public class Work implements Writable {
+
+  private String file;
+
+  private Status status;
+
+  public Work() { }
+
+  public Work(String file, Status status) {
+    this.file = file;
+    this.status = status;
+  }
+
+  public String getFile() {
+    return file;
+  }
+
+  public void setFile(String file) {
+    this.file = file;
+  }
+
+  public Status getStatus() {
+    return status;
+  }
+
+  public void setStatus(Status status) {
+    this.status = status;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    WritableUtils.writeString(out, file);
+    byte[] bytes = status.toByteArray();
+    WritableUtils.writeVInt(out, bytes.length);
+    out.write(bytes);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    file = WritableUtils.readString(in);
+    int len = WritableUtils.readVInt(in);
+    byte[] bytes = new byte[len];
+    in.readFully(bytes);
+    status = Status.parseFrom(bytes);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (o instanceof Work) {
+      Work other = (Work) o;
+      return file.equals(other.getFile()) && status.equals(other.getStatus());
+    }
+
+    return false;
+  }
+ 
+  @Override
+  public String toString() {
+    return file + " " + TextFormat.shortDebugString(status);
+  }
+
+  @Override
+  public int hashCode() {
+    return file.hashCode() ^ status.hashCode();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
new file mode 100644
index 0000000..b596126
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/ReplicationWorkAssignerTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import static org.easymock.EasyMock.createMock;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.replay;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Connector;
+import org.apache.accumulo.core.client.mock.MockInstance;
+import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
+import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.security.Credentials;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.master.Master;
+import org.apache.accumulo.server.replication.ReplicationTable;
+import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * 
+ */
+public class ReplicationWorkAssignerTest {
+
+  @Rule
+  public TestName test = new TestName();
+
+  private Master master;
+  private Connector conn;
+  private ReplicationWorkAssigner assigner;
+
+  @Before
+  public void init() {
+    master = createMock(Master.class);
+    conn = createMock(Connector.class);
+    assigner = new ReplicationWorkAssigner(master, conn);
+  }
+
+  @Test
+  public void workQueuedUsingFileName() throws Exception {
+    ReplicationTarget target = new ReplicationTarget("cluster1", "table1"); 
+    Text serializedTarget = ReplicationTarget.toText(target);
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    Set<String> queuedWork = new HashSet<>();
+    assigner.setQueuedWork(queuedWork);
+    assigner.setWorkQueue(workQueue);
+
+    Path p = new Path("/accumulo/wal/tserver+port/" + UUID.randomUUID());
+    assigner.queueWork(p.toString(), p.getName(), serializedTarget);
+    
+    workQueue.addWork(p.getName(), p.toString().getBytes(StandardCharsets.UTF_8));
+    expectLastCall().once();
+
+    replay(workQueue);
+
+    Assert.assertEquals(1, queuedWork.size());
+    Assert.assertEquals(p.getName() + "|" + serializedTarget, queuedWork.iterator().next());
+  }
+
+  @Test
+  public void existingWorkIsReQueued() throws Exception {
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+
+    List<String> existingWork = Arrays.asList("/accumulo/wal/tserver+port/wal1", "/accumulo/wal/tserver+port/wal2");
+    expect(workQueue.getWorkQueued()).andReturn(existingWork);
+
+    replay(workQueue);
+
+    assigner.setWorkQueue(workQueue);
+    assigner.initializeQueuedWork();
+
+    Set<String> queuedWork = assigner.getQueuedWork();
+    Assert.assertEquals("Expected existing work and queued work to be the same size", existingWork.size(), queuedWork.size());
+    Assert.assertTrue("Expected all existing work to be queued", queuedWork.containsAll(existingWork));
+  }
+
+  @Test
+  public void createWorkForFilesNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); 
+    Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2);
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, StatusUtil.openWithUnknownLengthValue());
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, StatusUtil.openWithUnknownLengthValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    @SuppressWarnings("unchecked")
+    HashSet<String> queuedWork = createMock(HashSet.class);
+    assigner.setQueuedWork(queuedWork);
+
+    expect(queuedWork.size()).andReturn(0).anyTimes();
+
+    // Make sure we expect the invocations in the correct order (accumulo is sorted)
+    if (file1.compareTo(file2) <= 0) {
+      String key = filename1 + "|" + serializedTarget1;
+      workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+      expectLastCall().once();
+      expect(queuedWork.add(key)).andReturn(true).once();
+      
+      key = filename2 + "|" + serializedTarget2;
+      workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+      expectLastCall().once();
+      expect(queuedWork.add(key)).andReturn(true).once();
+    } else {
+      String key = filename2 + "|" + serializedTarget2;
+      workQueue.addWork(key, file2.getBytes(StandardCharsets.UTF_8));
+      expectLastCall().once();
+      expect(queuedWork.add(key)).andReturn(true).once();
+
+      key = filename1 + "|" + serializedTarget1;
+      workQueue.addWork(key, file1.getBytes(StandardCharsets.UTF_8));
+      expectLastCall().once();
+      expect(queuedWork.add(key)).andReturn(true).once();
+    }
+
+    replay(queuedWork, workQueue);
+
+    assigner.createWork();
+  }
+
+  @Test
+  public void doNotCreateWorkForFilesNotNeedingIt() throws Exception {
+    ReplicationTarget target1 = new ReplicationTarget("cluster1", "table1"), target2 = new ReplicationTarget("cluster1", "table2"); 
+    Text serializedTarget1 = ReplicationTarget.toText(target1), serializedTarget2 = ReplicationTarget.toText(target2);
+
+    MockInstance inst = new MockInstance(test.getMethodName());
+    Credentials creds = new Credentials("root", new PasswordToken(""));
+    Connector conn = inst.getConnector(creds.getPrincipal(), creds.getToken());
+
+    // Set the connector
+    assigner.setConnector(conn);
+
+    // Create and grant ourselves write to the replication table
+    ReplicationTable.create(conn);
+    conn.securityOperations().grantTablePermission("root", ReplicationTable.NAME, TablePermission.WRITE);
+
+    // Create two mutations, both of which need replication work done
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    String filename1 = UUID.randomUUID().toString(), filename2 = UUID.randomUUID().toString();
+    String file1 = "/accumulo/wal/tserver+port/" + filename1, file2 = "/accumulo/wal/tserver+port/" + filename2;
+    Mutation m = new Mutation(file1);
+    WorkSection.add(m, serializedTarget1, StatusUtil.newFileValue());
+    bw.addMutation(m);
+
+    m = new Mutation(file2);
+    WorkSection.add(m, serializedTarget2, StatusUtil.newFileValue());
+    bw.addMutation(m);
+
+    bw.close();
+
+    DistributedWorkQueue workQueue = createMock(DistributedWorkQueue.class);
+    @SuppressWarnings("unchecked")
+    HashSet<String> queuedWork = createMock(HashSet.class);
+    assigner.setQueuedWork(queuedWork);
+
+    expect(queuedWork.size()).andReturn(0).anyTimes();
+
+    replay(queuedWork, workQueue);
+
+    assigner.createWork();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/accumulo/blob/31f4e83b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
new file mode 100644
index 0000000..09530a7
--- /dev/null
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.master.replication;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class WorkTest {
+
+  @Test
+  public void serialization() throws IOException {
+    Work w = new Work("/foo/bar", StatusUtil.openWithUnknownLength());
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    w.write(new DataOutputStream(baos));
+
+    ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
+
+    Work newW = new Work();
+    newW.readFields(new DataInputStream(bais));
+
+    Assert.assertEquals(w, newW);
+  }
+
+}