You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/19 23:06:13 UTC

[02/23] git commit: STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.

STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/72b1f592
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/72b1f592
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/72b1f592

Branch: refs/heads/security
Commit: 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Parents: c89fb82
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 10 19:54:11 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 10 19:54:11 2014 +0800

----------------------------------------------------------------------
 storm-core/pom.xml                              |   7 +
 .../backtype/storm/utils/DisruptorQueue.java    |  70 ++++++---
 .../storm/utils/DisruptorQueueTest.java         | 154 +++++++++++++++++++
 3 files changed, 210 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..26f08cb 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -176,9 +176,16 @@
             <artifactId>conjure</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+        	<groupId>junit</groupId>
+        	<artifactId>junit</artifactId>
+        	<version>4.1</version>
+        	<scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>
+         <testSourceDirectory>test/jvm</testSourceDirectory>
         <resources>
             <resource>
                 <directory>../conf</directory>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 8c5b466..0068964 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -27,13 +27,15 @@ import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.SingleThreadedClaimStrategy;
 import com.lmax.disruptor.WaitStrategy;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.HashMap;
 import java.util.Map;
 import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
 
 /**
  *
@@ -51,6 +53,11 @@ public class DisruptorQueue implements IStatefulObject {
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    
+    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+    private final Lock cacheReadLock  = cacheLock.readLock();
+    private final Lock cacheWriteLock = cacheLock.writeLock();
+    
     private static String PREFIX = "disruptor-";
     private String _queueName = "";
     
@@ -62,6 +69,13 @@ public class DisruptorQueue implements IStatefulObject {
         _buffer.setGatingSequences(_consumer);
         if(claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
+        } else {
+            // make sure we flush the pending messages in cache first
+            try {
+                publishDirect(FLUSH_CACHE, true);
+            } catch (InsufficientCapacityException e) {
+                throw new RuntimeException("This code should be unreachable!");
+            }
         }
     }
     
@@ -134,33 +148,47 @@ public class DisruptorQueue implements IStatefulObject {
     }
     
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-        if(consumerStartedFlag) {
-            final long id;
-            if(block) {
-                id = _buffer.next();
-            } else {
-                id = _buffer.tryNext(1);
+
+        boolean publishNow = consumerStartedFlag;
+
+        if (!publishNow) {
+            cacheReadLock.lock(); 
+            try {
+                publishNow = consumerStartedFlag;
+                if (!publishNow) {
+                    _cache.add(obj);
+                }
+            } finally {
+                cacheReadLock.unlock();
             }
-            final MutableObject m = _buffer.get(id);
-            m.setObject(obj);
-            _buffer.publish(id);
-        } else {
-            _cache.add(obj);
-            if(consumerStartedFlag) flushCache();
+        }
+        
+        if (publishNow) {
+            publishDirect(obj, block);
         }
     }
     
-    public void consumerStarted() {
-        if(!consumerStartedFlag) {
-            consumerStartedFlag = true;
-            flushCache();
+    private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+        final long id;
+        if(block) {
+            id = _buffer.next();
+        } else {
+            id = _buffer.tryNext(1);
         }
+        final MutableObject m = _buffer.get(id);
+        m.setObject(obj);
+        _buffer.publish(id);
     }
     
-    private void flushCache() {
-        publish(FLUSH_CACHE);
-    }
+    public void consumerStarted() {
 
+        consumerStartedFlag = true;
+        
+        // Use writeLock to make sure all pending cache add opearation completed
+        cacheWriteLock.lock();
+        cacheWriteLock.unlock();
+    }
+    
     public long  population() { return (writePos() - readPos()); }
     public long  capacity()   { return _buffer.getBufferSize(); }
     public long  writePos()   { return _buffer.getCursor(); }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
new file mode 100644
index 0000000..f21b10f
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class DisruptorQueueTest extends TestCase {
+
+    private final static int TIMEOUT = 5; // MS
+    private final static int PRODUCER_NUM = 4;
+
+    @Test
+    public void testMessageDisorder() throws InterruptedException {
+
+        // Set queue length to bigger enough
+        DisruptorQueue queue = createQueue("messageOrder", 16);
+
+        queue.publish("1");
+
+        Runnable producer = new Producer(queue, "2");
+
+        final Object [] result = new Object[1];
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            private boolean head = true;
+
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                if (head) {
+                    head = false;
+                    result[0] = obj;
+                }
+            }
+        });
+        
+        Assert.assertEquals("We expect to receive first published message first, but received " + result[0].toString(), 
+                "1", result[0]);
+        run(producer, consumer);
+    }
+    
+    @Test 
+    public void testConsumerHang() throws InterruptedException {
+        final AtomicBoolean messageConsumed = new AtomicBoolean(false);
+
+        // Set queue length to 1, so that the RingBuffer can be easily full
+        // to trigger consumer blocking
+        DisruptorQueue queue = createQueue("consumerHang", 1);
+        Runnable producer = new Producer(queue, "msg");
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                messageConsumed.set(true);
+            }
+        });
+
+        run(producer, consumer);
+        Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
+                messageConsumed.get());
+    }
+
+
+    private void run(Runnable producer, Runnable consumer)
+            throws InterruptedException {
+
+        Thread[] producerThreads = new Thread[PRODUCER_NUM];
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i] = new Thread(producer);
+            producerThreads[i].start();
+        }
+        
+        Thread consumerThread = new Thread(consumer);
+        consumerThread.start();
+                
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i].interrupt();
+            producerThreads[i].join(TIMEOUT);
+        }
+        consumerThread.interrupt();
+        consumerThread.join(TIMEOUT);
+        //consumerThread.stop();
+    }
+
+    private class Producer implements Runnable {
+        private String msg;
+        private DisruptorQueue queue;
+
+        Producer(DisruptorQueue queue, String msg) {
+            this.msg = msg;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    queue.publish(msg, false);
+                }
+            } catch (InsufficientCapacityException e) {
+                return;
+            }
+        }
+    };
+
+    private class Consumer implements Runnable {
+        private EventHandler handler;
+        private DisruptorQueue queue;
+
+        Consumer(DisruptorQueue queue, EventHandler handler) {
+            this.handler = handler;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            queue.consumerStarted();
+            try {
+                while(true) {
+                    queue.consumeBatchWhenAvailable(handler);
+                }
+            }catch(RuntimeException e) {
+                //break
+            }
+        }
+    };
+
+    private static DisruptorQueue createQueue(String name, int queueSize) {
+        return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
+                queueSize), new BlockingWaitStrategy());
+    }
+}