You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ki...@apache.org on 2015/10/28 15:56:41 UTC

[5/8] storm git commit: Fixed issue with disruptor queue timeout. Also updated it to use AtomicReference so debugging checks can be simpler.

Fixed issue with disruptor queue timeout.  Also updated it to use AtomicReference so debugging checks can be simpler.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/945db1a6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/945db1a6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/945db1a6

Branch: refs/heads/master
Commit: 945db1a6e92c0c5d6d3c95be103ab3241a82a74f
Parents: acaa3b9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Wed Oct 14 10:47:23 2015 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Oct 26 12:43:52 2015 -0500

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/945db1a6/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 9d4261a..33cb5bf 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -38,6 +38,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.metric.internal.RateTracker;
@@ -47,11 +51,12 @@ import backtype.storm.metric.internal.RateTracker;
  * the ability to catch up to the producer by processing tuples in batches.
  */
 public class DisruptorQueue implements IStatefulObject {
+    private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class);
     private static final Object FLUSH_CACHE = new Object();
     private static final Object INTERRUPT = new Object();
     private static final String PREFIX = "disruptor-";
 
-    private final RingBuffer<MutableObject> _buffer;
+    private final RingBuffer<AtomicReference<Object>> _buffer;
     private final Sequence _consumer;
     private final SequenceBarrier _barrier;
 
@@ -115,16 +120,13 @@ public class DisruptorQueue implements IStatefulObject {
     public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
         try {
             final long nextSequence = _consumer.get() + 1;
-            long availableSequence = 0;
-            try {
-                availableSequence = _barrier.waitFor(nextSequence);
-            } catch (TimeoutException te) {
-                availableSequence = _barrier.getCursor();
-            }
+            long availableSequence = _barrier.waitFor(nextSequence);
 
             if (availableSequence >= nextSequence) {
                 consumeBatchToCursor(availableSequence, handler);
             }
+        } catch (TimeoutException te) {
+            //Ignored
         } catch (AlertException e) {
             throw new RuntimeException(e);
         } catch (InterruptedException e) {
@@ -135,10 +137,12 @@ public class DisruptorQueue implements IStatefulObject {
     private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
         for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
             try {
-                MutableObject mo = _buffer.get(curr);
-                Object o = mo.getObject();
-                mo.setObject(null);
-                if (o == FLUSH_CACHE) {
+                AtomicReference<Object> mo = _buffer.get(curr);
+                Object o = mo.getAndSet(null);
+
+                if (o == null) {
+                    LOG.error("NULL found in {}:{}", this.getName(), cursor);
+                } else if (o == FLUSH_CACHE) {
                     Object c = null;
                     while (true) {
                         c = _cache.poll();
@@ -164,7 +168,6 @@ public class DisruptorQueue implements IStatefulObject {
                 throw new RuntimeException(e);
             }
         }
-        //TODO: only set this if the consumer cursor has changed?
         _consumer.set(cursor);
     }
 
@@ -214,8 +217,8 @@ public class DisruptorQueue implements IStatefulObject {
         } else {
             id = _buffer.tryNext(1);
         }
-        final MutableObject m = _buffer.get(id);
-        m.setObject(obj);
+        final AtomicReference<Object> m = _buffer.get(id);
+        Object old = m.getAndSet(obj);
         _buffer.publish(id);
         _metrics.notifyArrivals(1);
         if (_enableBackpressure && _cb != null && _metrics.population() >= _highWaterMark) {
@@ -228,6 +231,9 @@ public class DisruptorQueue implements IStatefulObject {
                throw new RuntimeException("Exception during calling highWaterMark callback!", e);
            }
         }
+        if (old != null) {
+            LOG.warn("Tuple was overwritten in {}:{}", getName(), id);
+        }
     }
 
     public void consumerStarted() {
@@ -266,10 +272,10 @@ public class DisruptorQueue implements IStatefulObject {
         return this;
     }
 
-    public static class ObjectEventFactory implements EventFactory<MutableObject> {
+    public static class ObjectEventFactory implements EventFactory<AtomicReference<Object>> {
         @Override
-        public MutableObject newInstance() {
-            return new MutableObject();
+        public AtomicReference<Object> newInstance() {
+            return new AtomicReference<Object>();
         }
     }