You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by da...@apache.org on 2016/08/10 05:53:15 UTC
incubator-omid git commit: [OMID-50]: Provide an option to reduce
tso-server CPU usage This closes #3
Repository: incubator-omid
Updated Branches:
refs/heads/master c13b3ed2f -> 4266f293c
[OMID-50]: Provide an option to reduce tso-server CPU usage
This closes #3
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/4266f293
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/4266f293
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/4266f293
Branch: refs/heads/master
Commit: 4266f293c4ae804ce80804e2ec774a2dd7bb0309
Parents: c13b3ed
Author: Daniel Dai <da...@hortonworks.com>
Authored: Tue Aug 9 22:52:58 2016 -0700
Committer: Daniel Dai <da...@hortonworks.com>
Committed: Tue Aug 9 22:52:58 2016 -0700
----------------------------------------------------------------------
.../apache/omid/transaction/TestTSOModule.java | 2 +-
.../TSOForHBaseCompactorTestModule.java | 2 +-
.../org/apache/omid/tso/DisruptorModule.java | 27 +++++++++++++++-
.../omid/tso/PersistenceProcessorImpl.java | 8 +++--
.../org/apache/omid/tso/ReplyProcessorImpl.java | 9 ++++--
.../org/apache/omid/tso/RetryProcessorImpl.java | 10 ++++--
.../java/org/apache/omid/tso/TSOModule.java | 2 +-
.../org/apache/omid/tso/TSOServerConfig.java | 19 +++++++++++
.../default-omid-server-configuration.yml | 7 +++--
.../java/org/apache/omid/tso/TSOMockModule.java | 2 +-
.../java/org/apache/omid/tso/TestPanicker.java | 4 +++
.../omid/tso/TestPersistenceProcessor.java | 33 +++++++++++---------
.../org/apache/omid/tso/TestReplyProcessor.java | 6 ++--
.../org/apache/omid/tso/TestRetryProcessor.java | 8 +++--
14 files changed, 103 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
index 623d255..67c9cba 100644
--- a/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
+++ b/hbase-client/src/test/java/org/apache/omid/transaction/TestTSOModule.java
@@ -76,7 +76,7 @@ class TestTSOModule extends AbstractModule {
install(new BatchPoolModule(config));
// Disruptor setup
- install(new DisruptorModule());
+ install(new DisruptorModule(config));
// LeaseManagement setup
install(config.getLeaseModule());
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
----------------------------------------------------------------------
diff --git a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
index b46ee73..abfe67c 100644
--- a/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
+++ b/hbase-coprocessor/src/test/java/org/apache/omid/transaction/TSOForHBaseCompactorTestModule.java
@@ -79,7 +79,7 @@ class TSOForHBaseCompactorTestModule extends AbstractModule {
install(new BatchPoolModule(config));
// DisruptorConfig
- install(new DisruptorModule());
+ install(new DisruptorModule(config));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
index 44d1ab9..2584629 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/DisruptorModule.java
@@ -18,14 +18,39 @@
package org.apache.omid.tso;
import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.YieldingWaitStrategy;
import javax.inject.Singleton;
public class DisruptorModule extends AbstractModule {
+ private final TSOServerConfig config;
+
+ public DisruptorModule(TSOServerConfig config) {
+ this.config = config;
+ }
+
@Override
protected void configure() {
-
+ switch (config.getWaitStrategyEnum()) {
+ // A low-cpu usage Disruptor configuration for using in local/test environments
+ case LOW_CPU:
+ bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BlockingWaitStrategy.class);
+ bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BlockingWaitStrategy.class);
+ bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(BlockingWaitStrategy.class);
+ break;
+ // The default high-cpu usage Disruptor configuration for getting high throughput on production environments
+ case HIGH_THROUGHPUT:
+ default:
+ bind(WaitStrategy.class).annotatedWith(Names.named("PersistenceStrategy")).to(BusySpinWaitStrategy.class);
+ bind(WaitStrategy.class).annotatedWith(Names.named("ReplyStrategy")).to(BusySpinWaitStrategy.class);
+ bind(WaitStrategy.class).annotatedWith(Names.named("RetryStrategy")).to(YieldingWaitStrategy.class);
+ break;
+ }
bind(RequestProcessor.class).to(RequestProcessorImpl.class).in(Singleton.class);
bind(PersistenceProcessor.class).to(PersistenceProcessorImpl.class).in(Singleton.class);
bind(ReplyProcessor.class).to(ReplyProcessorImpl.class).in(Singleton.class);
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 7735c6b..95d77ba 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -20,10 +20,12 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
+
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
@@ -33,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -69,6 +72,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
@Inject
PersistenceProcessorImpl(TSOServerConfig config,
+ @Named("PersistenceStrategy") WaitStrategy strategy,
CommitTable commitTable,
ObjectPool<Batch> batchPool,
Panicker panicker,
@@ -83,7 +87,7 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
this.disruptorExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
- this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, new BusySpinWaitStrategy());
+ this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, disruptorExec , SINGLE, strategy);
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
disruptor.handleEventsWithWorkerPool(handlers);
this.persistRing = disruptor.start();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index 276f803..8e50323 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -20,11 +20,13 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
-import com.lmax.disruptor.BusySpinWaitStrategy;
+import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
+
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
@@ -67,7 +69,8 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
private final Meter timestampMeter;
@Inject
- ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
+ ReplyProcessorImpl(@Named("ReplyStrategy") WaitStrategy strategy,
+ MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
// ------------------------------------------------------------------------------------------------------------
// Disruptor initialization
@@ -76,7 +79,7 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory.build());
- this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, new BusySpinWaitStrategy());
+ this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, MULTI, strategy);
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
disruptor.handleEventsWith(this);
this.replyRing = disruptor.start();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index 872daa0..6d923be 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -19,11 +19,13 @@ package org.apache.omid.tso;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.inject.name.Named;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
+
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -34,6 +36,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
+
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -69,7 +72,8 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
private final Meter noCTFoundMeter;
@Inject
- RetryProcessorImpl(MetricsRegistry metrics,
+ RetryProcessorImpl(@Named("RetryStrategy") WaitStrategy strategy,
+ MetricsRegistry metrics,
CommitTable commitTable,
ReplyProcessor replyProc,
Panicker panicker,
@@ -83,7 +87,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
this.disruptorExec = Executors.newSingleThreadExecutor(threadFactory);
- this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, new YieldingWaitStrategy());
+ this.disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, disruptorExec, SINGLE, strategy);
disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This must be before handleEventsWith()
disruptor.handleEventsWith(this);
this.retryRing = disruptor.start();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
index 9b67fa2..a7aec27 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOModule.java
@@ -48,7 +48,7 @@ class TSOModule extends AbstractModule {
install(new BatchPoolModule(config));
// Disruptor setup
- install(new DisruptorModule());
+ install(new DisruptorModule(config));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
index 489b806..3292211 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/TSOServerConfig.java
@@ -19,6 +19,7 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Module;
+
import org.apache.omid.NetworkUtils;
import org.apache.omid.YAMLUtils;
import org.apache.omid.metrics.MetricsRegistry;
@@ -38,6 +39,11 @@ public class TSOServerConfig extends SecureHBaseConfig {
private static final String CONFIG_FILE_NAME = "omid-server-configuration.yml";
private static final String DEFAULT_CONFIG_FILE_NAME = "default-omid-server-configuration.yml";
+ public static enum WAIT_STRATEGY {
+ HIGH_THROUGHPUT,
+ LOW_CPU
+ };
+
// ----------------------------------------------------------------------------------------------------------------
// Instantiation
// ----------------------------------------------------------------------------------------------------------------
@@ -72,6 +78,8 @@ public class TSOServerConfig extends SecureHBaseConfig {
private int batchPersistTimeoutInMs;
+ private String waitStrategy;
+
private String networkIfaceName = NetworkUtils.getDefaultNetworkInterface();
public int getPort() {
@@ -154,4 +162,15 @@ public class TSOServerConfig extends SecureHBaseConfig {
this.metrics = metrics;
}
+ public String getWaitStrategy() {
+ return waitStrategy;
+ }
+
+ public WAIT_STRATEGY getWaitStrategyEnum() {
+ return TSOServerConfig.WAIT_STRATEGY.valueOf(waitStrategy);
+ }
+
+ public void setWaitStrategy(String waitStrategy) {
+ this.waitStrategy = waitStrategy;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/main/resources/default-omid-server-configuration.yml
----------------------------------------------------------------------
diff --git a/tso-server/src/main/resources/default-omid-server-configuration.yml b/tso-server/src/main/resources/default-omid-server-configuration.yml
index de3faaf..017af4f 100644
--- a/tso-server/src/main/resources/default-omid-server-configuration.yml
+++ b/tso-server/src/main/resources/default-omid-server-configuration.yml
@@ -13,6 +13,10 @@
# Port reserved by the Status Oracle
port: 54758
+# Wait strategy for the Disruptor processors in TSO pipeline. Options:
+# 1) HIGH_THROUGHPUT - [Default] Use this in production deployments for maximum performance
+# 2) LOW_CPU - Use this option when testing or in deployments where saving CPU cycles is more important than throughput
+waitStrategy: HIGH_THROUGHPUT
# The number of elements reserved in the conflict map to perform conflict resolution
conflictMapSize: 100000000
# The number of Commit Table writers that persist data concurrently to the datastore. It has to be at least 2.
@@ -143,6 +147,3 @@ metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
# zkCluster: "localhost:2181"
# zkNamespace: "omid"
# metrics: !!org.apache.omid.metrics.NullMetricsProvider [ ]
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
index ec4b1ea..17fd2e0 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TSOMockModule.java
@@ -56,7 +56,7 @@ public class TSOMockModule extends AbstractModule {
install(new BatchPoolModule(config));
install(config.getLeaseModule());
- install(new DisruptorModule());
+ install(new DisruptorModule(config));
}
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
index 9b8ad29..ae89f01 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPanicker.java
@@ -30,6 +30,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.lmax.disruptor.BlockingWaitStrategy;
+
import java.io.IOException;
import static org.mockito.Matchers.any;
@@ -128,6 +130,7 @@ public class TestPanicker {
}
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+ new BlockingWaitStrategy(),
commitTable,
batchPool,
panicker,
@@ -180,6 +183,7 @@ public class TestPanicker {
}
PersistenceProcessor proc = new PersistenceProcessorImpl(config,
+ new BlockingWaitStrategy(),
commitTable,
batchPool,
panicker,
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
index 66381ba..4779608 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestPersistenceProcessor.java
@@ -32,6 +32,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.lmax.disruptor.BlockingWaitStrategy;
+
import java.io.IOException;
import static org.mockito.Matchers.any;
@@ -114,6 +116,7 @@ public class TestPersistenceProcessor {
// Component under test
PersistenceProcessorImpl persistenceProcessor =
new PersistenceProcessorImpl(tsoConfig,
+ new BlockingWaitStrategy(),
commitTable,
mock(ObjectPool.class),
panicker,
@@ -145,7 +148,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -158,7 +161,7 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -188,7 +191,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[tsoConfig.getNumConcurrentCTWriters()];
for (int i = 0; i < tsoConfig.getNumConcurrentCTWriters(); i++) {
@@ -202,7 +205,7 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
verify(batchPool, times(1)).borrowObject(); // Called during initialization
@@ -255,7 +258,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
// Init a non-HA lease manager
VoidLeaseManager leaseManager = spy(new VoidLeaseManager(mock(TSOChannelHandler.class),
@@ -273,7 +276,7 @@ public class TestPersistenceProcessor {
}
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// The non-ha lease manager always return true for
@@ -328,7 +331,7 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// Test: Configure the lease manager to return true always
@@ -349,7 +352,7 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod
@@ -370,7 +373,7 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// Test: Configure the lease manager to return false for stillInLeasePeriod
@@ -391,7 +394,7 @@ public class TestPersistenceProcessor {
PersistenceProcessorHandler[] handlers = configureHandlers (tsoConfig, simulatedHALeaseManager, batchPool);
// Component under test
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(tsoConfig, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// Test: Configure the lease manager to return true first and false later for stillInLeasePeriod and raise
@@ -416,7 +419,7 @@ public class TestPersistenceProcessor {
"localhost:1234",
leaseManager,
commitTable,
- new ReplyProcessorImpl(metrics, panicker, batchPool),
+ new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool),
retryProcessor,
new RuntimeExceptionPanicker());
}
@@ -433,7 +436,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = spy(new BatchPoolModule(config).getBatchPool());
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -446,7 +449,7 @@ public class TestPersistenceProcessor {
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
MonitoringContext monCtx = new MonitoringContext(metrics);
@@ -471,7 +474,7 @@ public class TestPersistenceProcessor {
ObjectPool<Batch> batchPool = new BatchPoolModule(config).getBatchPool();
- ReplyProcessor replyProcessor = new ReplyProcessorImpl(metrics, panicker, batchPool);
+ ReplyProcessor replyProcessor = new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool);
PersistenceProcessorHandler[] handlers = new PersistenceProcessorHandler[config.getNumConcurrentCTWriters()];
for (int i = 0; i < config.getNumConcurrentCTWriters(); i++) {
@@ -484,7 +487,7 @@ public class TestPersistenceProcessor {
panicker);
}
- PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, commitTable, batchPool,
+ PersistenceProcessorImpl proc = new PersistenceProcessorImpl(config, new BlockingWaitStrategy(), commitTable, batchPool,
panicker, handlers, metrics);
// Configure writer to explode with a runtime exception
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
index 3ae59df..3ead24b 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestReplyProcessor.java
@@ -31,6 +31,8 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import com.lmax.disruptor.BlockingWaitStrategy;
+
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.inOrder;
@@ -90,7 +92,7 @@ public class TestReplyProcessor {
batchPool = spy(new BatchPoolModule(tsoConfig).getBatchPool());
- replyProcessor = spy(new ReplyProcessorImpl(metrics, panicker, batchPool));
+ replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, panicker, batchPool));
}
@@ -102,7 +104,7 @@ public class TestReplyProcessor {
public void testBadFormedPackageThrowsException() throws Exception {
// We need an instance throwing exceptions for this test
- replyProcessor = spy(new ReplyProcessorImpl(metrics, new RuntimeExceptionPanicker(), batchPool));
+ replyProcessor = spy(new ReplyProcessorImpl(new BlockingWaitStrategy(), metrics, new RuntimeExceptionPanicker(), batchPool));
// Prepare test batch
Batch batch = batchPool.borrowObject();
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/4266f293/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
----------------------------------------------------------------------
diff --git a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
index eecab7a..54302d0 100644
--- a/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
+++ b/tso-server/src/test/java/org/apache/omid/tso/TestRetryProcessor.java
@@ -18,6 +18,8 @@
package org.apache.omid.tso;
import com.google.common.base.Optional;
+import com.lmax.disruptor.YieldingWaitStrategy;
+
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
@@ -70,7 +72,7 @@ public class TestRetryProcessor {
ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
// The element to test
- RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+ RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
// Test we'll reply with an abort for a retry request when the start timestamp IS NOT in the commit table
retryProc.disambiguateRetryRequestHeuristically(NON_EXISTING_ST_TX, channel, new MonitoringContext(metrics));
@@ -86,7 +88,7 @@ public class TestRetryProcessor {
ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
// The element to test
- RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+ RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
// Test we'll reply with a commit for a retry request when the start timestamp IS in the commit table
commitTable.getWriter().addCommittedTransaction(ST_TX_1, CT_TX_1);
@@ -120,7 +122,7 @@ public class TestRetryProcessor {
ObjectPool<Batch> batchPool = new BatchPoolModule(new TSOServerConfig()).getBatchPool();
// The element to test
- RetryProcessor retryProc = new RetryProcessorImpl(metrics, commitTable, replyProc, panicker, batchPool);
+ RetryProcessor retryProc = new RetryProcessorImpl(new YieldingWaitStrategy(), metrics, commitTable, replyProc, panicker, batchPool);
// Test we return an Abort to a retry request when the transaction id IS in the commit table BUT invalidated
retryProc.disambiguateRetryRequestHeuristically(ST_TX_1, channel, new MonitoringContext(metrics));