You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/29 02:59:02 UTC

[1/3] git commit: ACCUMULO-378 Allow for dynamic reconfiguration of the WorkAssigner impl

Repository: accumulo
Updated Branches:
  refs/heads/ACCUMULO-378 58fbf1438 -> 49fc9855f


ACCUMULO-378 Allow for dynamic reconfiguration of the WorkAssigner impl


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db10cfe2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db10cfe2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db10cfe2

Branch: refs/heads/ACCUMULO-378
Commit: db10cfe26823f5a31b516070521f54e5b890fb7b
Parents: 58fbf14
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 16:55:39 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 16:55:39 2014 -0400

----------------------------------------------------------------------
 .../accumulo/master/replication/WorkDriver.java | 33 +++++++++++++-------
 1 file changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/db10cfe2/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index 00b0480..8c3e3e3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -44,20 +44,28 @@ public class WorkDriver extends Daemon {
     this.master = master;
     this.conn = conn;
     this.conf = master.getConfiguration().getConfiguration();
+    configureWorkAssigner();
+  }
 
+  protected void configureWorkAssigner() {
     String workAssignerClass = conf.get(Property.REPLICATION_WORK_ASSIGNER);
-    try {
-      Class<?> clz = Class.forName(workAssignerClass);
-      Class<? extends WorkAssigner> workAssignerClz = clz.asSubclass(WorkAssigner.class);
-      this.assigner = workAssignerClz.newInstance();
-    } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-      log.error("Could not instantiate configured work assigner {}", workAssignerClass, e);
-      throw new RuntimeException(e);
-    }
 
-    this.assigner.configure(conf, conn);
-    this.assignerImplName = assigner.getClass().getName();
-    this.setName(assigner.getName());
+    if (null == assigner || !assigner.getClass().getName().equals(workAssignerClass)) {
+      log.info("Initializing work assigner implementation of {}", workAssignerClass);
+
+      try {
+        Class<?> clz = Class.forName(workAssignerClass);
+        Class<? extends WorkAssigner> workAssignerClz = clz.asSubclass(WorkAssigner.class);
+        this.assigner = workAssignerClz.newInstance();
+      } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+        log.error("Could not instantiate configured work assigner {}", workAssignerClass, e);
+        throw new RuntimeException(e);
+      }
+  
+      this.assigner.configure(conf, conn);
+      this.assignerImplName = assigner.getClass().getName();
+      this.setName(assigner.getName());
+    }
   }
 
   /*
@@ -90,6 +98,9 @@ public class WorkDriver extends Daemon {
       long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
       log.debug("Sleeping {} ms before next work assignment", sleepTime);
       UtilWaitThread.sleep(sleepTime);
+
+      // After each loop, make sure that the WorkAssigner implementation didn't change
+      configureWorkAssigner();
     }
   }
 }


[2/3] git commit: ACCUMULO-378 Resize the threadpool used for sending data by checking the configuration periodically

Posted by el...@apache.org.
ACCUMULO-378 Resize the threadpool used for sending data by checking the configuration periodically

Use the SimpleTimer to just schedule a check of the configuration
to see what the value is for the maximum size of the threadpool which
is used with the DistributedWorkQueue and ReplicationProcessor.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0ff0e021
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0ff0e021
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0ff0e021

Branch: refs/heads/ACCUMULO-378
Commit: 0ff0e021d3fc95794137dfeb3f6e1335b61b0a16
Parents: db10cfe
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 17:23:51 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 17:23:51 2014 -0400

----------------------------------------------------------------------
 .../org/apache/accumulo/tserver/TabletServer.java   | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/0ff0e021/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 567b2ad..e4c7ef9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3241,10 +3241,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
     log.info("Started replication service at " + replicationAddress);
 
     // Start the pool to handle outgoing replications
-    ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
+    final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
     replWorker.setExecutor(replicationThreadPool);
     replWorker.run();
 
+    // Check the configuration value for the size of the pool and, if changed, resize the pool, every 5 seconds);
+    final AccumuloConfiguration aconf = getSystemConfiguration();
+    Runnable replicationWorkThreadPoolResizer = new Runnable() {
+      @Override
+      public void run() {
+        int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS);
+        if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) {
+          log.info("Resizing thread pool for sending replication work from " + replicationThreadPool.getMaximumPoolSize() + " to " + maxPoolSize);
+          replicationThreadPool.setMaximumPoolSize(maxPoolSize);
+        }
+      }
+    };
+    SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
+
     try {
       OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
       // Do this because interface not in same package.


[3/3] git commit: ACCUMULO-378 Can't use '_' as the row separator for Order records.

Posted by el...@apache.org.
ACCUMULO-378 Can't use '_' as the row separator for Order records.

The ULongLexicoder *might* create bytes that actually equal the '_', which
will mess up the splitting logic of the row key. Switch it to a \x00 instead
and find the last instance of it.


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/49fc9855
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/49fc9855
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/49fc9855

Branch: refs/heads/ACCUMULO-378
Commit: 49fc9855f996ae0f5b3cc20e03e77ea8f707d640
Parents: 0ff0e02
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 20:07:52 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 20:58:43 2014 -0400

----------------------------------------------------------------------
 .../core/replication/ReplicationSchema.java     | 30 ++++++++++++++++----
 .../core/replication/ReplicationSchemaTest.java | 12 ++++++++
 .../accumulo/master/replication/WorkDriver.java |  6 +++-
 3 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 8699bd2..ab350e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -153,11 +153,11 @@ public class ReplicationSchema {
    * Holds the order in which files needed for replication were closed. The intent is to be able to guarantee that files which were closed earlier were
    * replicated first and we don't replay data in the wrong order on our peers
    * <p>
-   * <code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
+   * <code>encodedTimeOfClosure\x00hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
    */
   public static class OrderSection {
     public static final Text NAME = new Text("order");
-    public static final String ROW_SEPARATOR = "_";
+    public static final Text ROW_SEPARATOR = new Text(new byte[]{0});
     private static final ULongLexicoder longEncoder = new ULongLexicoder();
 
     /**
@@ -218,10 +218,10 @@ public class ReplicationSchema {
       Path p = new Path(file);
       String pathString = p.toUri().toString();
 
-      log.info("Normalized {} into {}", file, pathString);
+      log.trace("Normalized {} into {}", file, pathString);
 
       // Append the file as a suffix to the row
-      row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length());
+      row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.getLength());
 
       // Make the mutation and add the column update
       return new Mutation(row);
@@ -249,7 +249,16 @@ public class ReplicationSchema {
 
     public static long getTimeClosed(Key k, Text buff) {
       k.getRow(buff);
-      int offset = buff.find(ROW_SEPARATOR);
+      int offset = 0;
+      // find the last offset
+      while (true) {
+        int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+        if (-1 == nextOffset) {
+          break;
+        }
+        offset = nextOffset;
+      }
+
       if (-1 == offset) {
         throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
       }
@@ -266,7 +275,16 @@ public class ReplicationSchema {
 
     public static String getFile(Key k, Text buff) {
       k.getRow(buff);
-      int offset = buff.find(ROW_SEPARATOR);
+      int offset = 0;
+      // find the last offset
+      while (true) {
+        int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+        if (-1 == nextOffset) {
+          break;
+        }
+        offset = nextOffset;
+      }
+
       if (-1 == offset) {
         throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
       }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
index d321153..3822641 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
@@ -119,4 +119,16 @@ public class ReplicationSchemaTest {
     Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
     Assert.assertEquals(now, OrderSection.getTimeClosed(k, buff));
   }
+
+  @Test
+  public void separatorDoesntInterferWithSplit() {
+    Text buff = new Text();
+    // Cycle through 2*128 values
+    for (long i = 1; i < 258; i++) {
+      Mutation m = OrderSection.createMutation("/accumulo/file", i);
+      Key k = new Key(new Text(m.getRow()));
+      Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
+      Assert.assertEquals(i, OrderSection.getTimeClosed(k, buff));
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index 8c3e3e3..43a74a8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -93,7 +93,11 @@ public class WorkDriver extends Daemon {
 
     while (master.stillMaster()) {
       // Assign the work using the configured implementation
-      assigner.assignWork();
+      try {
+        assigner.assignWork();
+      } catch (Exception e) {
+        log.error("Error while assigning work", e);
+      }
 
       long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
       log.debug("Sleeping {} ms before next work assignment", sleepTime);