You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/09 17:29:21 UTC

[43/50] [abbrv] git commit: ACCUMULO-2709 Don't create the work entry unless there's work to be done

ACCUMULO-2709 Don't create the work entry unless there's work to be done

We can save on writing on some mutations by checking if the status record
needs replication. The WorkAssigner already had a check for this down stream,
but we can be a bit more efficient having it earlier on in the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/b5cd35a6
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/b5cd35a6
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/b5cd35a6

Branch: refs/heads/ACCUMULO-378
Commit: b5cd35a6d683c5ff315d5f6ab7f1879d798c1951
Parents: 3579b67
Author: Josh Elser <el...@apache.org>
Authored: Thu May 8 12:19:37 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Thu May 8 12:19:37 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/replication/WorkMaker.java  | 23 +++++++-
 .../master/replication/WorkMakerTest.java       | 61 +++++++++++++++++---
 2 files changed, 74 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/b5cd35a6/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
index cfc756c..956d550 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkMaker.java
@@ -34,6 +34,8 @@ import org.apache.accumulo.core.replication.ReplicationSchema;
 import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
+import org.apache.accumulo.core.replication.StatusUtil;
+import org.apache.accumulo.core.replication.proto.Replication.Status;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.replication.ReplicationTable;
@@ -41,13 +43,16 @@ import org.apache.accumulo.trace.instrument.Span;
 import org.apache.accumulo.trace.instrument.Trace;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
-import org.apache.log4j.Logger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
 
 /**
  * Reads replication records from the replication table and creates work records which include target replication system information.
  */
 public class WorkMaker {
-  private static final Logger log = Logger.getLogger(WorkMaker.class);
+  private static final Logger log = LoggerFactory.getLogger(WorkMaker.class);
 
   private final Connector conn;
 
@@ -88,6 +93,20 @@ public class WorkMaker {
         ReplicationSchema.StatusSection.getFile(entry.getKey(), file);
         ReplicationSchema.StatusSection.getTableId(entry.getKey(), tableId);
         log.info("Processing replication status record for " + file + " on table "+ tableId);
+
+        Status status;
+        try {
+          status = Status.parseFrom(entry.getValue().get());
+        } catch (InvalidProtocolBufferException e) {
+          log.error("Could not parse protobuf for {} from table {}", file, tableId);
+          continue;
+        }
+
+        // Don't create the record if we have nothing to do
+        // TODO put this into a filter on serverside
+        if (!StatusUtil.isWorkRequired(status)) {
+          continue;
+        }
   
         // Get the table configuration for the table specified by the status record
         tableConf = ServerConfiguration.getTableConfiguration(conn.getInstance(), tableId.toString());

http://git-wip-us.apache.org/repos/asf/accumulo/blob/b5cd35a6/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
----------------------------------------------------------------------
diff --git a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
index bebab51..34bb567 100644
--- a/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
+++ b/server/master/src/test/java/org/apache/accumulo/master/replication/WorkMakerTest.java
@@ -28,16 +28,16 @@ import org.apache.accumulo.core.client.Connector;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.mock.MockInstance;
 import org.apache.accumulo.core.client.security.tokens.PasswordToken;
+import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Key;
-import org.apache.accumulo.core.data.KeyExtent;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.replication.ReplicationSchema.StatusSection;
 import org.apache.accumulo.core.replication.ReplicationSchema.WorkSection;
 import org.apache.accumulo.core.replication.ReplicationTarget;
 import org.apache.accumulo.core.replication.StatusUtil;
 import org.apache.accumulo.server.conf.TableConfiguration;
 import org.apache.accumulo.server.replication.ReplicationTable;
-import org.apache.accumulo.server.util.ReplicationTableUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.junit.Assert;
@@ -47,6 +47,7 @@ import org.junit.Test;
 import org.junit.rules.TestName;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
 
 /**
  * 
@@ -82,19 +83,24 @@ public class WorkMakerTest {
     String tableId = conn.tableOperations().tableIdMap().get(table);
     String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
 
-    KeyExtent extent = new KeyExtent(new Text(tableId), null, null);
-    Mutation m = ReplicationTableUtil.createUpdateMutation(new Path(file), StatusUtil.fileClosedValue(), extent);
+    Mutation m = new Mutation(new Path(file).toString());
+    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue());
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     bw.addMutation(m);
     bw.flush();
 
+    // Assert that we have one record in the status section
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Assert.assertEquals(1, Iterables.size(s));
+
     WorkMaker workMaker = new WorkMaker(conn);
 
     ReplicationTarget expected = new ReplicationTarget("remote_cluster_1", "4");
     workMaker.setBatchWriter(bw);
     workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), ImmutableMap.of("remote_cluster_1", "4"));
 
-    Scanner s = ReplicationTable.getScanner(conn);
+    s = ReplicationTable.getScanner(conn);
     WorkSection.limit(s);
 
     Iterator<Entry<Key,Value>> iter = s.iterator();
@@ -119,12 +125,17 @@ public class WorkMakerTest {
     String tableId = conn.tableOperations().tableIdMap().get(table);
     String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
 
-    KeyExtent extent = new KeyExtent(new Text(tableId), null, null);
-    Mutation m = ReplicationTableUtil.createUpdateMutation(new Path(file), StatusUtil.fileClosedValue(), extent);
+    Mutation m = new Mutation(new Path(file).toString());
+    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.fileClosedValue());
     BatchWriter bw = ReplicationTable.getBatchWriter(conn);
     bw.addMutation(m);
     bw.flush();
 
+    // Assert that we have one record in the status section
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Assert.assertEquals(1, Iterables.size(s));
+
     WorkMaker workMaker = new WorkMaker(conn);
 
     Map<String,String> targetClusters = ImmutableMap.of("remote_cluster_1", "4", "remote_cluster_2", "6", "remote_cluster_3", "8");
@@ -135,7 +146,7 @@ public class WorkMakerTest {
     workMaker.setBatchWriter(bw);
     workMaker.addWorkRecord(new Text(file), StatusUtil.fileClosedValue(), targetClusters);
 
-    Scanner s = ReplicationTable.getScanner(conn);
+    s = ReplicationTable.getScanner(conn);
     WorkSection.limit(s);
 
     Set<ReplicationTarget> actualTargets = new HashSet<>();
@@ -154,4 +165,38 @@ public class WorkMakerTest {
 
     Assert.assertTrue("Found extra replication work entries: " + actualTargets, actualTargets.isEmpty());
   }
+
+  @Test
+  public void dontCreateWorkForEntriesWithNothingToReplicate() throws Exception {
+    String table = name.getMethodName();
+    conn.tableOperations().create(name.getMethodName());
+    String tableId = conn.tableOperations().tableIdMap().get(table);
+    String file = "hdfs://localhost:8020/accumulo/wal/123456-1234-1234-12345678";
+
+    Mutation m = new Mutation(new Path(file).toString());
+    m.put(StatusSection.NAME, new Text(tableId), StatusUtil.newFileValue());
+    BatchWriter bw = ReplicationTable.getBatchWriter(conn);
+    bw.addMutation(m);
+    bw.flush();
+
+    // Assert that we have one record in the status section
+    Scanner s = ReplicationTable.getScanner(conn);
+    StatusSection.limit(s);
+    Assert.assertEquals(1, Iterables.size(s));
+
+    WorkMaker workMaker = new WorkMaker(conn);
+
+    conn.tableOperations().setProperty(ReplicationTable.NAME, Property.TABLE_REPLICATION_TARGETS.getKey() + "remote_cluster_1", "4");
+
+    workMaker.setBatchWriter(bw);
+
+    // If we don't shortcircuit out, we should get an exception because ServerConfiguration.getTableConfiguration
+    // won't work with MockAccumulo
+    workMaker.run();
+
+    s = ReplicationTable.getScanner(conn);
+    WorkSection.limit(s);
+
+    Assert.assertEquals(0, Iterables.size(s));
+  }
 }