You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@omid.apache.org by fp...@apache.org on 2016/06/03 21:27:47 UTC

[20/23] incubator-omid git commit: [OMID-37] Improve creation of Disruptor components

[OMID-37] Improve creation of Disruptor components

Use the Disruptor API to create Omid processor components

Change-Id: I5d164bead9de6536495f08578cf6a02171b5d69f


Project: http://git-wip-us.apache.org/repos/asf/incubator-omid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-omid/commit/9722e401
Tree: http://git-wip-us.apache.org/repos/asf/incubator-omid/tree/9722e401
Diff: http://git-wip-us.apache.org/repos/asf/incubator-omid/diff/9722e401

Branch: refs/heads/master
Commit: 9722e4013cd2139395dea5b6315dfaac59e6b84b
Parents: 3236900
Author: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Authored: Wed May 25 21:58:38 2016 -0700
Committer: Francisco Perez-Sorrosal <fp...@yahoo-inc.com>
Committed: Wed May 25 21:59:26 2016 -0700

----------------------------------------------------------------------
 .../omid/tso/PersistenceProcessorImpl.java      | 37 ++++++++---------
 .../org/apache/omid/tso/ReplyProcessorImpl.java | 32 ++++++++-------
 .../apache/omid/tso/RequestProcessorImpl.java   | 42 +++++++++++---------
 .../org/apache/omid/tso/RetryProcessorImpl.java | 36 +++++++++--------
 4 files changed, 80 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
index 4fe21d2..5204332 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorImpl.java
@@ -24,6 +24,7 @@ import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.WorkerPool;
+import com.lmax.disruptor.dsl.Disruptor;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -39,6 +40,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
 import static org.apache.omid.metrics.MetricsUtils.name;
 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.EVENT_FACTORY;
 import static org.apache.omid.tso.PersistenceProcessorImpl.PersistBatchEvent.makePersistBatch;
@@ -47,8 +49,6 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
 
     private static final Logger LOG = LoggerFactory.getLogger(PersistenceProcessorImpl.class);
 
-    private static final long INITIAL_LWM_VALUE = -1L;
-
     private final RingBuffer<PersistBatchEvent> persistRing;
 
     private final ObjectPool<Batch> batchPool;
@@ -73,29 +73,30 @@ class PersistenceProcessorImpl implements PersistenceProcessor {
                              MetricsRegistry metrics)
             throws Exception {
 
+        // ------------------------------------------------------------------------------------------------------------
+        // Disruptor initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
+        ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(), threadFactory.build());
+
+        Disruptor<PersistBatchEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 20, requestExec , SINGLE, new BusySpinWaitStrategy());
+        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
+        disruptor.handleEventsWithWorkerPool(handlers);
+        this.persistRing = disruptor.start();
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Attribute initialization
+        // ------------------------------------------------------------------------------------------------------------
+
         this.metrics = metrics;
         this.lowWatermarkWriter = commitTable.getWriter();
         this.batchSequence = 0L;
         this.batchPool = batchPool;
         this.currentBatch = batchPool.borrowObject();
-
         // Low Watermark writer
         ThreadFactoryBuilder lwmThreadFactory = new ThreadFactoryBuilder().setNameFormat("lwm-writer-%d");
-        lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
-
-        // Disruptor configuration
-        this.persistRing = RingBuffer.createSingleProducer(EVENT_FACTORY, 1 << 20, new BusySpinWaitStrategy());
-
-        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("persist-%d");
-        ExecutorService requestExec = Executors.newFixedThreadPool(config.getNumConcurrentCTWriters(),
-                                                                   threadFactory.build());
-
-        WorkerPool<PersistBatchEvent> persistProcessor = new WorkerPool<>(persistRing,
-                                                                          persistRing.newBarrier(),
-                                                                          new FatalExceptionHandler(panicker),
-                                                                          handlers);
-        this.persistRing.addGatingSequences(persistProcessor.getWorkerSequences());
-        persistProcessor.start(requestExec);
+        this.lowWatermarkWriterExecutor = Executors.newSingleThreadExecutor(lwmThreadFactory.build());
 
         // Metrics config
         this.lwmWriteTimer = metrics.timer(name("tso", "lwmWriter", "latency"));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
index ab257d6..d5e4c00 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java
@@ -20,12 +20,11 @@ package org.apache.omid.tso;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
-import com.lmax.disruptor.BatchEventProcessor;
 import com.lmax.disruptor.BusySpinWaitStrategy;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
+import com.lmax.disruptor.dsl.Disruptor;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
@@ -38,10 +37,10 @@ import java.util.Comparator;
 import java.util.PriorityQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static com.codahale.metrics.MetricRegistry.name;
+import static com.lmax.disruptor.dsl.ProducerType.MULTI;
 
 class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEvent>, ReplyProcessor {
 
@@ -65,28 +64,31 @@ class ReplyProcessorImpl implements EventHandler<ReplyProcessorImpl.ReplyBatchEv
     @Inject
     ReplyProcessorImpl(MetricsRegistry metrics, Panicker panicker, ObjectPool<Batch> batchPool) {
 
-        this.batchPool = batchPool;
-
-        this.nextIDToHandle.set(0);
+        // ------------------------------------------------------------------------------------------------------------
+        // Disruptor initialization
+        // ------------------------------------------------------------------------------------------------------------
 
-        this.replyRing = RingBuffer.createMultiProducer(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, new BusySpinWaitStrategy());
+        ThreadFactoryBuilder threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d");
+        ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory.build());
 
-        SequenceBarrier replySequenceBarrier = replyRing.newBarrier();
-        BatchEventProcessor<ReplyBatchEvent> replyProcessor = new BatchEventProcessor<>(replyRing, replySequenceBarrier, this);
-        replyProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
+        Disruptor<ReplyProcessorImpl.ReplyBatchEvent> disruptor = new Disruptor<>(ReplyBatchEvent.EVENT_FACTORY, 1 << 12, requestExec, MULTI, new BusySpinWaitStrategy());
+        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker));
+        disruptor.handleEventsWith(this);
+        this.replyRing = disruptor.start();
 
-        replyRing.addGatingSequences(replyProcessor.getSequence());
-
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reply-%d").build();
-        ExecutorService replyExec = Executors.newSingleThreadExecutor(threadFactory);
-        replyExec.submit(replyProcessor);
+        // ------------------------------------------------------------------------------------------------------------
+        // Attribute initialization
+        // ------------------------------------------------------------------------------------------------------------
 
+        this.batchPool = batchPool;
+        this.nextIDToHandle.set(0);
         this.futureEvents = new PriorityQueue<>(10, new Comparator<ReplyBatchEvent>() {
             public int compare(ReplyBatchEvent replyBatchEvent1, ReplyBatchEvent replyBatchEvent2) {
                 return Long.compare(replyBatchEvent1.getBatchSequence(), replyBatchEvent2.getBatchSequence());
             }
         });
 
+        // Metrics config
         this.abortMeter = metrics.meter(name("tso", "aborts"));
         this.commitMeter = metrics.meter(name("tso", "commits"));
         this.timestampMeter = metrics.meter(name("tso", "timestampAllocation"));

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
index 94a3df5..b945eaf 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RequestProcessorImpl.java
@@ -25,6 +25,7 @@ import com.lmax.disruptor.RingBuffer;
 import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.TimeoutBlockingWaitStrategy;
 import com.lmax.disruptor.TimeoutHandler;
+import com.lmax.disruptor.dsl.Disruptor;
 import org.apache.omid.metrics.MetricsRegistry;
 import org.apache.omid.tso.TSOStateManager.TSOState;
 import org.jboss.netty.channel.Channel;
@@ -38,8 +39,13 @@ import java.util.Iterator;
 import java.util.NoSuchElementException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import static com.lmax.disruptor.dsl.ProducerType.MULTI;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.omid.tso.RequestProcessorImpl.RequestEvent.EVENT_FACTORY;
+
 class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestEvent>, RequestProcessor, TimeoutHandler {
 
     private static final Logger LOG = LoggerFactory.getLogger(RequestProcessorImpl.class);
@@ -59,29 +65,29 @@ class RequestProcessorImpl implements EventHandler<RequestProcessorImpl.RequestE
                          TSOServerConfig config)
             throws IOException {
 
-        this.metrics = metrics;
+        // ------------------------------------------------------------------------------------------------------------
+        // Disruptor initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        TimeoutBlockingWaitStrategy timeoutStrategy = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), MILLISECONDS);
+
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("request-%d").build();
+        ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);
+
+        Disruptor<RequestEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, MULTI, timeoutStrategy);
+        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
+        disruptor.handleEventsWith(this);
+        this.requestRing = disruptor.start();
 
+        // ------------------------------------------------------------------------------------------------------------
+        // Attribute initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        this.metrics = metrics;
         this.persistProc = persistProc;
         this.timestampOracle = timestampOracle;
-
         this.hashmap = new CommitHashMap(config.getMaxItems());
 
-        final TimeoutBlockingWaitStrategy timeoutStrategy
-                = new TimeoutBlockingWaitStrategy(config.getBatchPersistTimeoutInMs(), TimeUnit.MILLISECONDS);
-
-        // Set up the disruptor thread
-        requestRing = RingBuffer.createMultiProducer(RequestEvent.EVENT_FACTORY, 1 << 12, timeoutStrategy);
-        SequenceBarrier requestSequenceBarrier = requestRing.newBarrier();
-        BatchEventProcessor<RequestEvent> requestProcessor =
-                new BatchEventProcessor<>(requestRing, requestSequenceBarrier, this);
-        requestRing.addGatingSequences(requestProcessor.getSequence());
-        requestProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-
-        ExecutorService requestExec = Executors.newSingleThreadExecutor(
-                new ThreadFactoryBuilder().setNameFormat("request-%d").build());
-        // Each processor runs on a separate thread
-        requestExec.submit(requestProcessor);
-
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-omid/blob/9722e401/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
index a23a913..776f0ed 100644
--- a/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
+++ b/tso-server/src/main/java/org/apache/omid/tso/RetryProcessorImpl.java
@@ -19,25 +19,21 @@ package org.apache.omid.tso;
 
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.lmax.disruptor.BatchEventProcessor;
 import com.lmax.disruptor.EventFactory;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.YieldingWaitStrategy;
+import com.lmax.disruptor.dsl.Disruptor;
 import org.apache.commons.pool2.ObjectPool;
 import org.apache.omid.committable.CommitTable;
 import org.apache.omid.committable.CommitTable.CommitTimestamp;
 import org.apache.omid.metrics.Meter;
 import org.apache.omid.metrics.MetricsRegistry;
-
 import org.jboss.netty.channel.Channel;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.inject.Inject;
-
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -45,6 +41,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 
 import static com.codahale.metrics.MetricRegistry.name;
+import static com.lmax.disruptor.dsl.ProducerType.SINGLE;
+import static org.apache.omid.tso.RetryProcessorImpl.RetryEvent.EVENT_FACTORY;
 
 /**
  * Manages the disambiguation of the retry requests that clients send when they did not received a response in the
@@ -74,20 +72,26 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
                        ObjectPool<Batch> batchPool)
             throws InterruptedException, ExecutionException, IOException {
 
+        // ------------------------------------------------------------------------------------------------------------
+        // Disruptor initialization
+        // ------------------------------------------------------------------------------------------------------------
+
+        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
+        ExecutorService requestExec = Executors.newSingleThreadExecutor(threadFactory);
+
+        Disruptor<RetryProcessorImpl.RetryEvent> disruptor = new Disruptor<>(EVENT_FACTORY, 1 << 12, requestExec, SINGLE, new YieldingWaitStrategy());
+        disruptor.handleExceptionsWith(new FatalExceptionHandler(panicker)); // This should be before the event handler
+        disruptor.handleEventsWith(this);
+        this.retryRing = disruptor.start();
+
+        // ------------------------------------------------------------------------------------------------------------
+        // Attribute initialization
+        // ------------------------------------------------------------------------------------------------------------
+
         this.commitTableClient = commitTable.getClient();
         this.replyProc = replyProc;
         this.batchPool = batchPool;
 
-        retryRing = RingBuffer.createSingleProducer(RetryEvent.EVENT_FACTORY, 1 << 12, new YieldingWaitStrategy());
-        SequenceBarrier retrySequenceBarrier = retryRing.newBarrier();
-        BatchEventProcessor<RetryEvent> retryProcessor = new BatchEventProcessor<>(retryRing, retrySequenceBarrier, this);
-        retryProcessor.setExceptionHandler(new FatalExceptionHandler(panicker));
-        retryRing.addGatingSequences(retryProcessor.getSequence());
-
-        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("retry-%d").build();
-        ExecutorService retryExec = Executors.newSingleThreadExecutor(threadFactory);
-        retryExec.submit(retryProcessor);
-
         // Metrics configuration
         this.txAlreadyCommittedMeter = metrics.meter(name("tso", "retries", "commits", "tx-already-committed"));
         this.invalidTxMeter = metrics.meter(name("tso", "retries", "aborts", "tx-invalid"));
@@ -116,7 +120,7 @@ class RetryProcessorImpl implements EventHandler<RetryProcessorImpl.RetryEvent>,
         long startTimestamp = event.getStartTimestamp();
         try {
             Optional<CommitTimestamp> commitTimestamp = commitTableClient.getCommitTimestamp(startTimestamp).get();
-            if(commitTimestamp.isPresent()) {
+            if (commitTimestamp.isPresent()) {
                 if (commitTimestamp.get().isValid()) {
                     LOG.trace("Tx {}: Valid commit TS found in Commit Table. Sending Commit to client.", startTimestamp);
                     replyProc.sendCommitResponse(startTimestamp, commitTimestamp.get().getValue(), event.getChannel());