You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2014/05/29 02:59:02 UTC
[1/3] git commit: ACCUMULO-378 Allow for dynamic reconfiguration of
the WorkAssigner impl
Repository: accumulo
Updated Branches:
refs/heads/ACCUMULO-378 58fbf1438 -> 49fc9855f
ACCUMULO-378 Allow for dynamic reconfiguration of the WorkAssigner impl
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/db10cfe2
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/db10cfe2
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/db10cfe2
Branch: refs/heads/ACCUMULO-378
Commit: db10cfe26823f5a31b516070521f54e5b890fb7b
Parents: 58fbf14
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 16:55:39 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 16:55:39 2014 -0400
----------------------------------------------------------------------
.../accumulo/master/replication/WorkDriver.java | 33 +++++++++++++-------
1 file changed, 22 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/db10cfe2/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index 00b0480..8c3e3e3 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -44,20 +44,28 @@ public class WorkDriver extends Daemon {
this.master = master;
this.conn = conn;
this.conf = master.getConfiguration().getConfiguration();
+ configureWorkAssigner();
+ }
+ protected void configureWorkAssigner() {
String workAssignerClass = conf.get(Property.REPLICATION_WORK_ASSIGNER);
- try {
- Class<?> clz = Class.forName(workAssignerClass);
- Class<? extends WorkAssigner> workAssignerClz = clz.asSubclass(WorkAssigner.class);
- this.assigner = workAssignerClz.newInstance();
- } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
- log.error("Could not instantiate configured work assigner {}", workAssignerClass, e);
- throw new RuntimeException(e);
- }
- this.assigner.configure(conf, conn);
- this.assignerImplName = assigner.getClass().getName();
- this.setName(assigner.getName());
+ if (null == assigner || !assigner.getClass().getName().equals(workAssignerClass)) {
+ log.info("Initializing work assigner implementation of {}", workAssignerClass);
+
+ try {
+ Class<?> clz = Class.forName(workAssignerClass);
+ Class<? extends WorkAssigner> workAssignerClz = clz.asSubclass(WorkAssigner.class);
+ this.assigner = workAssignerClz.newInstance();
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ log.error("Could not instantiate configured work assigner {}", workAssignerClass, e);
+ throw new RuntimeException(e);
+ }
+
+ this.assigner.configure(conf, conn);
+ this.assignerImplName = assigner.getClass().getName();
+ this.setName(assigner.getName());
+ }
}
/*
@@ -90,6 +98,9 @@ public class WorkDriver extends Daemon {
long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
log.debug("Sleeping {} ms before next work assignment", sleepTime);
UtilWaitThread.sleep(sleepTime);
+
+ // After each loop, make sure that the WorkAssigner implementation didn't change
+ configureWorkAssigner();
}
}
}
[2/3] git commit: ACCUMULO-378 Resize the threadpool used for sending
data by checking the configuration periodically
Posted by el...@apache.org.
ACCUMULO-378 Resize the threadpool used for sending data by checking the configuration periodically
Use the SimpleTimer to just schedule a check of the configuration
to see what the value is for the maximum size of the threadpool which
is used with the DistributedWorkQueue and ReplicationProcessor.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/0ff0e021
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/0ff0e021
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/0ff0e021
Branch: refs/heads/ACCUMULO-378
Commit: 0ff0e021d3fc95794137dfeb3f6e1335b61b0a16
Parents: db10cfe
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 17:23:51 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 17:23:51 2014 -0400
----------------------------------------------------------------------
.../org/apache/accumulo/tserver/TabletServer.java | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/0ff0e021/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
----------------------------------------------------------------------
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 567b2ad..e4c7ef9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -3241,10 +3241,24 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu
log.info("Started replication service at " + replicationAddress);
// Start the pool to handle outgoing replications
- ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
+ final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
replWorker.setExecutor(replicationThreadPool);
replWorker.run();
+ // Check the configuration value for the size of the pool and, if changed, resize the pool, every 5 seconds);
+ final AccumuloConfiguration aconf = getSystemConfiguration();
+ Runnable replicationWorkThreadPoolResizer = new Runnable() {
+ @Override
+ public void run() {
+ int maxPoolSize = aconf.getCount(Property.REPLICATION_WORKER_THREADS);
+ if (replicationThreadPool.getMaximumPoolSize() != maxPoolSize) {
+ log.info("Resizing thread pool for sending replication work from " + replicationThreadPool.getMaximumPoolSize() + " to " + maxPoolSize);
+ replicationThreadPool.setMaximumPoolSize(maxPoolSize);
+ }
+ }
+ };
+ SimpleTimer.getInstance(aconf).schedule(replicationWorkThreadPoolResizer, 10000, 30000);
+
try {
OBJECT_NAME = new ObjectName("accumulo.server.metrics:service=TServerInfo,name=TabletServerMBean,instance=" + Thread.currentThread().getName());
// Do this because interface not in same package.
[3/3] git commit: ACCUMULO-378 Can't use '_' as the row separator for
Order records.
Posted by el...@apache.org.
ACCUMULO-378 Can't use '_' as the row separator for Order records.
The ULongLexicoder *might* create bytes that actually equal the '_', which
will mess up the splitting logic of the row key. Switch it to a \x00 instead
and find the last instance of it.
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/49fc9855
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/49fc9855
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/49fc9855
Branch: refs/heads/ACCUMULO-378
Commit: 49fc9855f996ae0f5b3cc20e03e77ea8f707d640
Parents: 0ff0e02
Author: Josh Elser <el...@apache.org>
Authored: Wed May 28 20:07:52 2014 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Wed May 28 20:58:43 2014 -0400
----------------------------------------------------------------------
.../core/replication/ReplicationSchema.java | 30 ++++++++++++++++----
.../core/replication/ReplicationSchemaTest.java | 12 ++++++++
.../accumulo/master/replication/WorkDriver.java | 6 +++-
3 files changed, 41 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
index 8699bd2..ab350e6 100644
--- a/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
+++ b/core/src/main/java/org/apache/accumulo/core/replication/ReplicationSchema.java
@@ -153,11 +153,11 @@ public class ReplicationSchema {
* Holds the order in which files needed for replication were closed. The intent is to be able to guarantee that files which were closed earlier were
* replicated first and we don't replay data in the wrong order on our peers
* <p>
- * <code>encodedTimeOfClosure_hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
+ * <code>encodedTimeOfClosure\x00hdfs://localhost:8020/accumulo/wal/tserver+port/WAL order:source_table_id [] -> Status Protobuf</code>
*/
public static class OrderSection {
public static final Text NAME = new Text("order");
- public static final String ROW_SEPARATOR = "_";
+ public static final Text ROW_SEPARATOR = new Text(new byte[]{0});
private static final ULongLexicoder longEncoder = new ULongLexicoder();
/**
@@ -218,10 +218,10 @@ public class ReplicationSchema {
Path p = new Path(file);
String pathString = p.toUri().toString();
- log.info("Normalized {} into {}", file, pathString);
+ log.trace("Normalized {} into {}", file, pathString);
// Append the file as a suffix to the row
- row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.length());
+ row.append((ROW_SEPARATOR + pathString).getBytes(), 0, pathString.length() + ROW_SEPARATOR.getLength());
// Make the mutation and add the column update
return new Mutation(row);
@@ -249,7 +249,16 @@ public class ReplicationSchema {
public static long getTimeClosed(Key k, Text buff) {
k.getRow(buff);
- int offset = buff.find(ROW_SEPARATOR);
+ int offset = 0;
+ // find the last offset
+ while (true) {
+ int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+ if (-1 == nextOffset) {
+ break;
+ }
+ offset = nextOffset;
+ }
+
if (-1 == offset) {
throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
}
@@ -266,7 +275,16 @@ public class ReplicationSchema {
public static String getFile(Key k, Text buff) {
k.getRow(buff);
- int offset = buff.find(ROW_SEPARATOR);
+ int offset = 0;
+ // find the last offset
+ while (true) {
+ int nextOffset = buff.find(ROW_SEPARATOR.toString(), offset + 1);
+ if (-1 == nextOffset) {
+ break;
+ }
+ offset = nextOffset;
+ }
+
if (-1 == offset) {
throw new IllegalArgumentException("Row does not contain expected separator for OrderSection");
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
index d321153..3822641 100644
--- a/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/replication/ReplicationSchemaTest.java
@@ -119,4 +119,16 @@ public class ReplicationSchemaTest {
Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
Assert.assertEquals(now, OrderSection.getTimeClosed(k, buff));
}
+
+ @Test
+ public void separatorDoesntInterferWithSplit() {
+ Text buff = new Text();
+ // Cycle through 2*128 values
+ for (long i = 1; i < 258; i++) {
+ Mutation m = OrderSection.createMutation("/accumulo/file", i);
+ Key k = new Key(new Text(m.getRow()));
+ Assert.assertEquals("/accumulo/file", OrderSection.getFile(k, buff));
+ Assert.assertEquals(i, OrderSection.getTimeClosed(k, buff));
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/accumulo/blob/49fc9855/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
----------------------------------------------------------------------
diff --git a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
index 8c3e3e3..43a74a8 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/replication/WorkDriver.java
@@ -93,7 +93,11 @@ public class WorkDriver extends Daemon {
while (master.stillMaster()) {
// Assign the work using the configured implementation
- assigner.assignWork();
+ try {
+ assigner.assignWork();
+ } catch (Exception e) {
+ log.error("Error while assigning work", e);
+ }
long sleepTime = conf.getTimeInMillis(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP);
log.debug("Sleeping {} ms before next work assignment", sleepTime);