You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/06/03 21:27:47 UTC
[20/23] incubator-omid git commit: [OMID-37] Improve creation of
Disruptor components
[OMID-37] Improve creation of Disruptor components
Use the Disruptor API to create Omid processor components
Change-Id: I5d164bead9de6536495f08578cf6a02171b5d69f
Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/9722e401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/9722e401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/9722e401
Branch: refs/heads/master
Commit: 9722e4013cd2139395dea5b6315dfaac59e6b84b
Parents: 3236900
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Wed May 25 21:58:38 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 25 21:59:26 2016 -0700
----------------------------------------------------------------------
.../omid/tso/PersistenceProcessorImpl.java | 37 ++++++++---------
.../org/apache/omid/tso/ReplyProcessorImpl.java | 32 ++++++++-------
.../apache/omid/tso/RequestProcessorImpl.java | 42 +++++++++++---------
.../org/apache/omid/tso/RetryProcessorImpl.java | 36 +++++++++--------
4 files changed, 80 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 4fe21d2..5204332 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -24,6 +24,7 @@ import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.WorkerPool;
+import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.metrics.MetricsRegistry;
@@ -39,6 +40,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
import static org.apache.omid.metrics.MetricsUtils.name;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
@@ -47,8 +49,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
- private static final long INITIAL_LWM_VALUE = -1L;
-
private final RingBuffer<PersistBatchEvent> persistRing;
private final ObjectPool<Batch> batchPool;
@@ -73,29 +73,30 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
MetricsRegistry metrics)
throws Exception {
+ // ------------------------------------------------------------------------------------------------------------
+ // Disruptor initialization
+ // ------------------------------------------------------------------------------------------------------------
+
+ ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
+ ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
+
+ Disruptor<PersistBatchEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, requestExec , SINGLE, new BusySpinWaitStrategy());
+ disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
+ disruptor.handleEventsWithWorkerPool(handlers);
+ this.persistRing = disruptor.start();
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Attribute initialization
+ // ------------------------------------------------------------------------------------------------------------
+
this.metrics = metrics;
this.lowWatermarkWriter = commitTable.getWriter();
this.batchSequence = 0L;
this.batchPool = batchPool;
this.currentBatch = batchPool.borrowObject();
-
// Low Watermark writer
ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
- lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
-
- // Disruptor configuration
- this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
-
- ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
- ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
- threadFactory.build());
-
- WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
- persistRing.newBarrier(),
- new FatalExceptionHandler(panicker),
- handlers);
- this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
- persistProcessor.start(requestExec);
+ this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
// Metrics config
this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index ab257d6..d5e4c00 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -20,12 +20,11 @@ package org.apache.omid.tso;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
-import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
+import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
@@ -38,10 +37,10 @@ import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import static com.codahale.metrics.MetricRegistry.name;
+import static com.lmax.disruptor.dsl.ProducerType.MULTI;
class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
@@ -65,28 +64,31 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
@Inject
ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
- this.batchPool = batchPool;
-
- this.nextIDToHandle.set(0);
+ // ------------------------------------------------------------------------------------------------------------
+ // Disruptor initialization
+ // ------------------------------------------------------------------------------------------------------------
- this.replyRing = RingBuffer.createMultiProducer(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
+ ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
+ ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory.build());
- SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
- BatchEventProcessor<ReplyBatchEvent> replyProcessor = new BatchEventProcessor<>(replyRing, replySequenceBarrier, this);
- replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
+ Disruptor<ReplyProcessorImpl.ReplyBatchEvent> disruptor = new Disruptor<>(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, requestExec, MULTI, new BusySpinWaitStrategy());
+ disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
+ disruptor.handleEventsWith(this);
+ this.replyRing = disruptor.start();
- replyRing.addGatingSequences(replyProcessor.getSequence());
-
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d").build();
- ExecutorService replyExec = Executors.newSingleThreadExecutor(threadFactory);
- replyExec.submit(replyProcessor);
+ // ------------------------------------------------------------------------------------------------------------
+ // Attribute initialization
+ // ------------------------------------------------------------------------------------------------------------
+ this.batchPool = batchPool;
+ this.nextIDToHandle.set(0);
this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
}
});
+ // Metrics config
this.abortMeter = metrics.meter(name("tso", "aborts"));
this.commitMeter = metrics.meter(name("tso", "commits"));
this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 94a3df5..b945eaf 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -25,6 +25,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
import com.lmax.disruptor.TimeoutHandler;
+import com.lmax.disruptor.dsl.Disruptor;
import org.apache.omid.metrics.MetricsRegistry;
import org.apache.omid.tso.TSOStateManager.TSOState;
import org.jboss.netty.channel.Channel;
@@ -38,8 +39,13 @@ import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import static com.lmax.disruptor.dsl.ProducerType.MULTI;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
+
class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
@@ -59,29 +65,29 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
TSOServerConfig config)
throws IOException {
- this.metrics = metrics;
+ // ------------------------------------------------------------------------------------------------------------
+ // Disruptor initialization
+ // ------------------------------------------------------------------------------------------------------------
+
+ TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
+ ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);
+
+ Disruptor<RequestEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, MULTI, timeoutStrategy);
+ disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
+ disruptor.handleEventsWith(this);
+ this.requestRing = disruptor.start();
+ // ------------------------------------------------------------------------------------------------------------
+ // Attribute initialization
+ // ------------------------------------------------------------------------------------------------------------
+
+ this.metrics = metrics;
this.persistProc = persistProc;
this.timestampOracle = timestampOracle;
-
this.hashmap = new CommitHashMap(config.getMaxItems());
- final TimeoutBlockingWaitStrategy timeoutStrategy
- = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
-
- // Set up the disruptor thread
- requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, timeoutStrategy);
- SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
- BatchEventProcessor<RequestEvent> requestProcessor =
- new BatchEventProcessor<>(requestRing, requestSequenceBarrier, this);
- requestRing.addGatingSequences(requestProcessor.getSequence());
- requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
- ExecutorService requestExec = Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder().setNameFormat("request-%d").build());
- // Each processor runs on a separate thread
- requestExec.submit(requestProcessor);
-
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index a23a913..776f0ed 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -19,25 +19,21 @@ package org.apache.omid.tso;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
import org.apache.commons.pool2.ObjectPool;
import org.apache.omid.committable.CommitTable;
import org.apache.omid.committable.CommitTable.CommitTimestamp;
import org.apache.omid.metrics.Meter;
import org.apache.omid.metrics.MetricsRegistry;
-
import org.jboss.netty.channel.Channel;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
-
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -45,6 +41,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import static com.codahale.metrics.MetricRegistry.name;
+import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
+import static org.apache.omid.tso.RetryProcessorImpl.RetryEvent.EVENT_FACTORY;
/**
* Manages the disambiguation of the retry requests that clients send when they did not received a response in the
@@ -74,20 +72,26 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
ObjectPool<Batch> batchPool)
throws InterruptedException, ExecutionException, IOException {
+ // ------------------------------------------------------------------------------------------------------------
+ // Disruptor initialization
+ // ------------------------------------------------------------------------------------------------------------
+
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
+ ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);
+
+ Disruptor<RetryProcessorImpl.RetryEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, SINGLE, new YieldingWaitStrategy());
+ disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
+ disruptor.handleEventsWith(this);
+ this.retryRing = disruptor.start();
+
+ // ------------------------------------------------------------------------------------------------------------
+ // Attribute initialization
+ // ------------------------------------------------------------------------------------------------------------
+
this.commitTableClient = commitTable.getClient();
this.replyProc = replyProc;
this.batchPool = batchPool;
- retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, new YieldingWaitStrategy());
- SequenceBarrier retrySequenceBarrier = retryRing.newBarrier();
- BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySequenceBarrier, this);
- retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
- retryRing.addGatingSequences(retryProcessor.getSequence());
-
- ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
- ExecutorService retryExec = Executors.newSingleThreadExecutor(threadFactory);
- retryExec.submit(retryProcessor);
-
// Metrics configuration
this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
@@ -116,7 +120,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
long startTimestamp = event.getStartTimestamp();
try {
Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
- if(commitTimestamp.isPresent()) {
+ if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());