You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by da...@apache.org on 2016/08/10 05:53:15 UTC

incubator-omid git commit: [OMID-50]: Provide an option to reduce tso-server CPU usage This closes #3

Repository: incubator-omid
Updated Branches:
  refs/heads/master c13b3ed2f -> 4266f293c


[OMID-50]: Provide an option to reduce tso-server CPU usage
This closes #3


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/4266f293
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/4266f293
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/4266f293

Branch: refs/heads/master
Commit: 4266f293c4ae804ce80804e2ec774a2dd7bb0309
Parents: c13b3ed
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Aug 9 22:52:58 2016 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Aug 9 22:52:58 2016 -0700

----------------------------------------------------------------------
 .../apache/omid/transaction/TestTSOModule.java  |  2 +-
 .../TSOForHBaseCompactorTestModule.java         |  2 +-
 .../org/apache/omid/tso/DisruptorModule.java    | 27 +++++++++++++++-
 .../omid/tso/PersistenceProcessorImpl.java      |  8 +++--
 .../org/apache/omid/tso/ReplyProcessorImpl.java |  9 ++++--
 .../org/apache/omid/tso/RetryProcessorImpl.java | 10 ++++--
 .../java/org/apache/omid/tso/TSOModule.java     |  2 +-
 .../org/apache/omid/tso/TSOServerConfig.java    | 19 +++++++++++
 .../default-omid-server-configuration.yml       |  7 +++--
 .../java/org/apache/omid/tso/TSOMockModule.java |  2 +-
 .../java/org/apache/omid/tso/TestPanicker.java  |  4 +++
 .../omid/tso/TestPersistenceProcessor.java      | 33 +++++++++++---------
 .../org/apache/omid/tso/TestReplyProcessor.java |  6 ++--
 .../org/apache/omid/tso/TestRetryProcessor.java |  8 +++--
 14 files changed, 103 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 623d255..67c9cba 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -76,7 +76,7 @@ class TestTSOModule extends AbstractModule {
         install(new BatchPoolModule(config));
 
         // Disruptor setup
-        install(new DisruptorModule());
+        install(new DisruptorModule(config));
 
         // LeaseManagement setup
         install(config.getLeaseModule());

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index b46ee73..abfe67c 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -79,7 +79,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
 
         install(new BatchPoolModule(config));
         // DisruptorConfig
-        install(new DisruptorModule());
+        install(new DisruptorModule(config));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
index 44d1ab9..2584629 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
@@ -18,14 +18,39 @@
 package org.apache.omid.tso;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
 
 import javax.inject.Singleton;
 
 public class DisruptorModule extends AbstractModule {
 
+    private final TSOServerConfig config;
+
+    public DisruptorModule(TSOServerConfig config) {
+        this.config = config;
+    }
+
     @Override
     protected void configure() {
-
+        switch (config.getWaitStrategyEnum()) {
+        // A low-cpu usage Disruptor configuration for using in local/test environments
+        case LOW_CPU:
+             bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class);
+             bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class);
+             bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class);
+             break;
+        // The default high-cpu usage Disruptor configuration for getting high throughput on production environments
+        case HIGH_THROUGHPUT:
+        default:
+             bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class);
+             bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class);
+             bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
+             break;
+        }
         bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
         bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
         bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 7735c6b..95d77ba 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -20,10 +20,12 @@ package org.apache.omid.tso;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Objects;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.google.inject.name.Named;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
+
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -33,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+
 import java.io.IOException;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
@@ -69,6 +72,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
 
     @Inject
     PersistenceProcessorImpl(TSOServerConfig config,
+                             @Named("PersistenceStrategy") WaitStrategy strategy,
                              CommitTable commitTable,
                              ObjectPool<Batch> batchPool,
                              Panicker panicker,
@@ -83,7 +87,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
         ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
         this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
 
-        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, new BusySpinWaitStrategy());
+        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, strategy);
         disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
         disruptor.handleEventsWithWorkerPool(handlers);
         this.persistRing = disruptor.start();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 276f803..8e50323 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -20,11 +20,13 @@ package org.apache.omid.tso;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
-import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.google.inject.name.Named;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
+
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -67,7 +69,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     private final Meter timestampMeter;
 
     @Inject
-    ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
+    ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
+            MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
 
         // ------------------------------------------------------------------------------------------------------------
         // Disruptor initialization
@@ -76,7 +79,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
         ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
         this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());
 
-        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, new BusySpinWaitStrategy());
+        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
         disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
         disruptor.handleEventsWith(this);
         this.replyRing = disruptor.start();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 872daa0..6d923be 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -19,11 +19,13 @@ package org.apache.omid.tso;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.name.Named;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.Disruptor;
+
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
+
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -69,7 +72,8 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
     private final Meter noCTFoundMeter;
 
     @Inject
-    RetryProcessorImpl(MetricsRegistry metrics,
+    RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy,
+                       MetricsRegistry metrics,
                        CommitTable commitTable,
                        ReplyProcessor replyProc,
                        Panicker panicker,
@@ -83,7 +87,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
         ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
         this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
 
-        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, new YieldingWaitStrategy());
+        this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy);
         disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
         disruptor.handleEventsWith(this);
         this.retryRing = disruptor.start();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 9b67fa2..a7aec27 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -48,7 +48,7 @@ class TSOModule extends AbstractModule {
 
         install(new BatchPoolModule(config));
         // Disruptor setup
-        install(new DisruptorModule());
+        install(new DisruptorModule(config));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 489b806..3292211 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -19,6 +19,7 @@ package org.apache.omid.tso;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Module;
+
 import org.apache.omid.NetworkUtils;
 import org.apache.omid.YAMLUtils;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -38,6 +39,11 @@ public class TSOServerConfig extends SecureHBaseConfig {
     private static final String CONFIG_FILE_NAME = "omid-server-configuration.yml";
     private static final String DEFAULT_CONFIG_FILE_NAME = "default-omid-server-configuration.yml";
 
+    public static enum WAIT_STRATEGY {
+        HIGH_THROUGHPUT,
+        LOW_CPU
+    };
+
     // ----------------------------------------------------------------------------------------------------------------
     // Instantiation
     // ----------------------------------------------------------------------------------------------------------------
@@ -72,6 +78,8 @@ public class TSOServerConfig extends SecureHBaseConfig {
 
     private int batchPersistTimeoutInMs;
 
+    private String waitStrategy;
+
     private String networkIfaceName = NetworkUtils.getDefaultNetworkInterface();
 
     public int getPort() {
@@ -154,4 +162,15 @@ public class TSOServerConfig extends SecureHBaseConfig {
         this.metrics = metrics;
     }
 
+    public String getWaitStrategy() {
+        return waitStrategy;
+    }
+
+    public WAIT_STRATEGY getWaitStrategyEnum() {
+        return TSOServerConfig.WAIT_STRATEGY.valueOf(waitStrategy);
+    }
+
+    public void setWaitStrategy(String waitStrategy) {
+        this.waitStrategy = waitStrategy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index de3faaf..017af4f 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -13,6 +13,10 @@
 
 # Port reserved by the Status Oracle
 port: 54758
+# Wait strategy for the Disruptor processors in TSO pipeline. Options:
+# 1) HIGH_THROUGHPUT - [Default] Use this in production deployments for maximum performance
+# 2) LOW_CPU - Use this option when testing or in deployments where saving CPU cycles is more important than throughput
+waitStrategy: HIGH_THROUGHPUT
 # The number of elements reserved in the conflict map to perform conflict resolution
 conflictMapSize: 100000000
 # The number of Commit Table writers that persist data concurrently to the datastore. It has to be at least 2.
@@ -143,6 +147,3 @@ metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
 #     zkCluster: "localhost:2181"
 #     zkNamespace: "omid"
 # metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index ec4b1ea..17fd2e0 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -56,7 +56,7 @@ public class TSOMockModule extends AbstractModule {
 
         install(new BatchPoolModule(config));
         install(config.getLeaseModule());
-        install(new DisruptorModule());
+        install(new DisruptorModule(config));
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 9b8ad29..ae89f01 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -30,6 +30,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.lmax.disruptor.BlockingWaitStrategy;
+
 import java.io.IOException;
 
 import static org.mockito.Matchers.any;
@@ -128,6 +130,7 @@ public class TestPanicker {
         }
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+                                                                 new BlockingWaitStrategy(),
                                                                  commitTable,
                                                                  batchPool,
                                                                  panicker,
@@ -180,6 +183,7 @@ public class TestPanicker {
         }
 
         PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+                                                                 new BlockingWaitStrategy(),
                                                                  commitTable,
                                                                  batchPool,
                                                                  panicker,

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 66381ba..4779608 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -32,6 +32,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.lmax.disruptor.BlockingWaitStrategy;
+
 import java.io.IOException;
 
 import static org.mockito.Matchers.any;
@@ -114,6 +116,7 @@ public class TestPersistenceProcessor {
         // Component under test
         PersistenceProcessorImpl persistenceProcessor =
                 new PersistenceProcessorImpl(tsoConfig,
+                                             new BlockingWaitStrategy(),
                                              commitTable,
                                              mock(ObjectPool.class),
                                              panicker,
@@ -145,7 +148,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -158,7 +161,7 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -188,7 +191,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
         for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -202,7 +205,7 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -255,7 +258,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
 
         // Init a non-HA lease manager
         VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
@@ -273,7 +276,7 @@ public class TestPersistenceProcessor {
         }
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // The non-ha lease manager always return true for
@@ -328,7 +331,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true always
@@ -349,7 +352,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
@@ -370,7 +373,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return false for stillInLeasePeriod
@@ -391,7 +394,7 @@ public class TestPersistenceProcessor {
         PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
 
         // Component under test
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
@@ -416,7 +419,7 @@ public class TestPersistenceProcessor {
                                                           "localhost:1234",
                                                           leaseManager,
                                                           commitTable,
-                                                          new ReplyProcessorImpl(metrics, panicker, batchPool),
+                                                          new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
                                                           retryProcessor,
                                                           new RuntimeExceptionPanicker());
         }
@@ -433,7 +436,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -446,7 +449,7 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         MonitoringContext monCtx = new MonitoringContext(metrics);
@@ -471,7 +474,7 @@ public class TestPersistenceProcessor {
 
         ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
 
-        ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+        ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
 
         PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
         for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -484,7 +487,7 @@ public class TestPersistenceProcessor {
                                                           panicker);
         }
 
-        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+        PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
                                                                      panicker, handlers, metrics);
 
         // Configure writer to explode with a runtime exception

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ae59df..3ead24b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -31,6 +31,8 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.lmax.disruptor.BlockingWaitStrategy;
+
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.inOrder;
@@ -90,7 +92,7 @@ public class TestReplyProcessor {
 
         batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
 
-        replyProcessor = spy(new ReplyProcessorImpl(metrics, panicker, batchPool));
+        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool));
 
     }
 
@@ -102,7 +104,7 @@ public class TestReplyProcessor {
     public void testBadFormedPackageThrowsException() throws Exception {
 
         // We need an instance throwing exceptions for this test
-        replyProcessor = spy(new ReplyProcessorImpl(metrics, new RuntimeExceptionPanicker(), batchPool));
+        replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool));
 
         // Prepare test batch
         Batch batch = batchPool.borrowObject();

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index eecab7a..54302d0 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -18,6 +18,8 @@
 package org.apache.omid.tso;
 
 import com.google.common.base.Optional;
+import com.lmax.disruptor.YieldingWaitStrategy;
+
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -70,7 +72,7 @@ public class TestRetryProcessor {
         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
 
         // The element to test
-        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+        RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
         retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
@@ -86,7 +88,7 @@ public class TestRetryProcessor {
         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
 
         // The element to test
-        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+        RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
         commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
@@ -120,7 +122,7 @@ public class TestRetryProcessor {
         ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
 
         // The element to test
-        RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+        RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
 
         // Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
         retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));