You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/06/12 23:03:13 UTC

[1/7] git commit: STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.

Repository: incubator-storm
Updated Branches:
  refs/heads/master 0826b9332 -> 2bad66eae


STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/72b1f592
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/72b1f592
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/72b1f592

Branch: refs/heads/master
Commit: 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Parents: c89fb82
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 10 19:54:11 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 10 19:54:11 2014 +0800

----------------------------------------------------------------------
 storm-core/pom.xml                              |   7 +
 .../backtype/storm/utils/DisruptorQueue.java    |  70 ++++++---
 .../storm/utils/DisruptorQueueTest.java         | 154 +++++++++++++++++++
 3 files changed, 210 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..26f08cb 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -176,9 +176,16 @@
             <artifactId>conjure</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+        	<groupId>junit</groupId>
+        	<artifactId>junit</artifactId>
+        	<version>4.1</version>
+        	<scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <sourceDirectory>src/jvm</sourceDirectory>
+         <testSourceDirectory>test/jvm</testSourceDirectory>
         <resources>
             <resource>
                 <directory>../conf</directory>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 8c5b466..0068964 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -27,13 +27,15 @@ import com.lmax.disruptor.Sequence;
 import com.lmax.disruptor.SequenceBarrier;
 import com.lmax.disruptor.SingleThreadedClaimStrategy;
 import com.lmax.disruptor.WaitStrategy;
+
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.HashMap;
 import java.util.Map;
 import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
 
 /**
  *
@@ -51,6 +53,11 @@ public class DisruptorQueue implements IStatefulObject {
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    
+    private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+    private final Lock cacheReadLock  = cacheLock.readLock();
+    private final Lock cacheWriteLock = cacheLock.writeLock();
+    
     private static String PREFIX = "disruptor-";
     private String _queueName = "";
     
@@ -62,6 +69,13 @@ public class DisruptorQueue implements IStatefulObject {
         _buffer.setGatingSequences(_consumer);
         if(claim instanceof SingleThreadedClaimStrategy) {
             consumerStartedFlag = true;
+        } else {
+            // make sure we flush the pending messages in cache first
+            try {
+                publishDirect(FLUSH_CACHE, true);
+            } catch (InsufficientCapacityException e) {
+                throw new RuntimeException("This code should be unreachable!");
+            }
         }
     }
     
@@ -134,33 +148,47 @@ public class DisruptorQueue implements IStatefulObject {
     }
     
     public void publish(Object obj, boolean block) throws InsufficientCapacityException {
-        if(consumerStartedFlag) {
-            final long id;
-            if(block) {
-                id = _buffer.next();
-            } else {
-                id = _buffer.tryNext(1);
+
+        boolean publishNow = consumerStartedFlag;
+
+        if (!publishNow) {
+            cacheReadLock.lock(); 
+            try {
+                publishNow = consumerStartedFlag;
+                if (!publishNow) {
+                    _cache.add(obj);
+                }
+            } finally {
+                cacheReadLock.unlock();
             }
-            final MutableObject m = _buffer.get(id);
-            m.setObject(obj);
-            _buffer.publish(id);
-        } else {
-            _cache.add(obj);
-            if(consumerStartedFlag) flushCache();
+        }
+        
+        if (publishNow) {
+            publishDirect(obj, block);
         }
     }
     
-    public void consumerStarted() {
-        if(!consumerStartedFlag) {
-            consumerStartedFlag = true;
-            flushCache();
+    private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+        final long id;
+        if(block) {
+            id = _buffer.next();
+        } else {
+            id = _buffer.tryNext(1);
         }
+        final MutableObject m = _buffer.get(id);
+        m.setObject(obj);
+        _buffer.publish(id);
     }
     
-    private void flushCache() {
-        publish(FLUSH_CACHE);
-    }
+    public void consumerStarted() {
 
+        consumerStartedFlag = true;
+        
+        // Use writeLock to make sure all pending cache add opearation completed
+        cacheWriteLock.lock();
+        cacheWriteLock.unlock();
+    }
+    
     public long  population() { return (writePos() - readPos()); }
     public long  capacity()   { return _buffer.getBufferSize(); }
     public long  writePos()   { return _buffer.getCursor(); }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
new file mode 100644
index 0000000..f21b10f
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class DisruptorQueueTest extends TestCase {
+
+    private final static int TIMEOUT = 5; // MS
+    private final static int PRODUCER_NUM = 4;
+
+    @Test
+    public void testMessageDisorder() throws InterruptedException {
+
+        // Set queue length to bigger enough
+        DisruptorQueue queue = createQueue("messageOrder", 16);
+
+        queue.publish("1");
+
+        Runnable producer = new Producer(queue, "2");
+
+        final Object [] result = new Object[1];
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            private boolean head = true;
+
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                if (head) {
+                    head = false;
+                    result[0] = obj;
+                }
+            }
+        });
+        
+        Assert.assertEquals("We expect to receive first published message first, but received " + result[0].toString(), 
+                "1", result[0]);
+        run(producer, consumer);
+    }
+    
+    @Test 
+    public void testConsumerHang() throws InterruptedException {
+        final AtomicBoolean messageConsumed = new AtomicBoolean(false);
+
+        // Set queue length to 1, so that the RingBuffer can be easily full
+        // to trigger consumer blocking
+        DisruptorQueue queue = createQueue("consumerHang", 1);
+        Runnable producer = new Producer(queue, "msg");
+        Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+            @Override
+            public void onEvent(Object obj, long sequence, boolean endOfBatch)
+                    throws Exception {
+                messageConsumed.set(true);
+            }
+        });
+
+        run(producer, consumer);
+        Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
+                messageConsumed.get());
+    }
+
+
+    private void run(Runnable producer, Runnable consumer)
+            throws InterruptedException {
+
+        Thread[] producerThreads = new Thread[PRODUCER_NUM];
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i] = new Thread(producer);
+            producerThreads[i].start();
+        }
+        
+        Thread consumerThread = new Thread(consumer);
+        consumerThread.start();
+                
+        for (int i = 0; i < PRODUCER_NUM; i++) {
+            producerThreads[i].interrupt();
+            producerThreads[i].join(TIMEOUT);
+        }
+        consumerThread.interrupt();
+        consumerThread.join(TIMEOUT);
+        //consumerThread.stop();
+    }
+
+    private class Producer implements Runnable {
+        private String msg;
+        private DisruptorQueue queue;
+
+        Producer(DisruptorQueue queue, String msg) {
+            this.msg = msg;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    queue.publish(msg, false);
+                }
+            } catch (InsufficientCapacityException e) {
+                return;
+            }
+        }
+    };
+
+    private class Consumer implements Runnable {
+        private EventHandler handler;
+        private DisruptorQueue queue;
+
+        Consumer(DisruptorQueue queue, EventHandler handler) {
+            this.handler = handler;
+            this.queue = queue;
+        }
+
+        @Override
+        public void run() {
+            queue.consumerStarted();
+            try {
+                while(true) {
+                    queue.consumeBatchWhenAvailable(handler);
+                }
+            }catch(RuntimeException e) {
+                //break
+            }
+        }
+    };
+
+    private static DisruptorQueue createQueue(String name, int queueSize) {
+        return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
+                queueSize), new BlockingWaitStrategy());
+    }
+}


[7/7] git commit: revert poms for 0.9.2-incubating re-release

Posted by pt...@apache.org.
revert poms for 0.9.2-incubating re-release


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/2bad66ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/2bad66ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/2bad66ea

Branch: refs/heads/master
Commit: 2bad66eae90651df7424b389f099e62fe8b9ec5f
Parents: 5299531
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Jun 12 17:02:36 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Jun 12 17:02:36 2014 -0400

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                           | 2 +-
 external/storm-kafka/pom.xml                             | 2 +-
 pom.xml                                                  | 2 +-
 storm-buildtools/maven-shade-clojure-transformer/pom.xml | 2 +-
 storm-core/pom.xml                                       | 2 +-
 storm-dist/binary/pom.xml                                | 2 +-
 storm-dist/source/pom.xml                                | 2 +-
 7 files changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 903c6e7..182546f 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -20,7 +20,7 @@
   <parent>
       <artifactId>storm</artifactId>
       <groupId>org.apache.storm</groupId>
-      <version>0.9.3-incubating-SNAPSHOT</version>
+      <version>0.9.2-incubating-SNAPSHOT</version>
       <relativePath>../../pom.xml</relativePath>
   </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 4972619..ff411a3 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index b7286dc..ce10cc2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -27,7 +27,7 @@
 
     <groupId>org.apache.storm</groupId>
     <artifactId>storm</artifactId>
-    <version>0.9.3-incubating-SNAPSHOT</version>
+    <version>0.9.2-incubating-SNAPSHOT</version>
     <packaging>pom</packaging>
     <name>Storm</name>
     <description>Distributed and fault-tolerant realtime computation</description>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/storm-buildtools/maven-shade-clojure-transformer/pom.xml
----------------------------------------------------------------------
diff --git a/storm-buildtools/maven-shade-clojure-transformer/pom.xml b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
index a6fbad1..7dfa2a2 100644
--- a/storm-buildtools/maven-shade-clojure-transformer/pom.xml
+++ b/storm-buildtools/maven-shade-clojure-transformer/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 45878f6..26f08cb 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -20,7 +20,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-SNAPSHOT</version>
     </parent>
     <groupId>org.apache.storm</groupId>
     <artifactId>storm-core</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/storm-dist/binary/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/pom.xml b/storm-dist/binary/pom.xml
index 0d97c0b..4fa3a27 100644
--- a/storm-dist/binary/pom.xml
+++ b/storm-dist/binary/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/2bad66ea/storm-dist/source/pom.xml
----------------------------------------------------------------------
diff --git a/storm-dist/source/pom.xml b/storm-dist/source/pom.xml
index 4fedefb..b5552df 100644
--- a/storm-dist/source/pom.xml
+++ b/storm-dist/source/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.9.3-incubating-SNAPSHOT</version>
+        <version>0.9.2-incubating-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>


[2/7] git commit: STORM-342: rename the read-write lock

Posted by pt...@apache.org.
STORM-342: rename the read-write lock


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/6946c34e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/6946c34e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/6946c34e

Branch: refs/heads/master
Commit: 6946c34e9f06726b46c5fb1bf12e1f186a1a8a19
Parents: 72b1f59
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 10 20:29:28 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 10 20:29:28 2014 +0800

----------------------------------------------------------------------
 .../src/jvm/backtype/storm/utils/DisruptorQueue.java    | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/6946c34e/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 0068964..75ccbbc 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -55,8 +55,8 @@ public class DisruptorQueue implements IStatefulObject {
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
     
     private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
-    private final Lock cacheReadLock  = cacheLock.readLock();
-    private final Lock cacheWriteLock = cacheLock.writeLock();
+    private final Lock readLock  = cacheLock.readLock();
+    private final Lock writeLock = cacheLock.writeLock();
     
     private static String PREFIX = "disruptor-";
     private String _queueName = "";
@@ -152,14 +152,14 @@ public class DisruptorQueue implements IStatefulObject {
         boolean publishNow = consumerStartedFlag;
 
         if (!publishNow) {
-            cacheReadLock.lock(); 
+            readLock.lock(); 
             try {
                 publishNow = consumerStartedFlag;
                 if (!publishNow) {
                     _cache.add(obj);
                 }
             } finally {
-                cacheReadLock.unlock();
+                readLock.unlock();
             }
         }
         
@@ -185,8 +185,8 @@ public class DisruptorQueue implements IStatefulObject {
         consumerStartedFlag = true;
         
         // Use writeLock to make sure all pending cache add opearation completed
-        cacheWriteLock.lock();
-        cacheWriteLock.unlock();
+        writeLock.lock();
+        writeLock.unlock();
     }
     
     public long  population() { return (writePos() - readPos()); }


[6/7] git commit: update changelog for STORM-342

Posted by pt...@apache.org.
update changelog for STORM-342


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/52995317
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/52995317
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/52995317

Branch: refs/heads/master
Commit: 5299531739b6661a7ce8d3aafb486d6dde09bbf6
Parents: 405dabb
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Jun 12 17:00:30 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Jun 12 17:00:30 2014 -0400

----------------------------------------------------------------------
 CHANGELOG.md                                                    | 5 ++---
 .../test/jvm/backtype/storm/utils/DisruptorQueueTest.java       | 1 -
 2 files changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/52995317/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ee2eae5..2e5b079 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,7 +1,6 @@
-## 0.9.3-incubating (unreleased)
- * STORM-338: Move towards idiomatic Clojure style 
-
 ## 0.9.2-incubating
+ * STORM-342: Contention in Disruptor Queue which may cause out of order or lost messages
+ * STORM-338: Move towards idiomatic Clojure style 
  * STORM-335: add drpc test for removing timed out requests from queue
  * STORM-69: Storm UI Visualizations for Topologies
  * STORM-297: Performance scaling with CPU

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/52995317/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 747d95c..653fd33 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -101,7 +101,6 @@ public class DisruptorQueueTest extends TestCase {
         }
         consumerThread.interrupt();
         consumerThread.join(TIMEOUT);
-        //consumerThread.stop();
     }
 
     private class Producer implements Runnable {


[4/7] git commit: Merge branch 'disruptor_message_loss_hang_or_disorder' of github.com:clockfly/incubator-storm

Posted by pt...@apache.org.
Merge branch 'disruptor_message_loss_hang_or_disorder' of github.com:clockfly/incubator-storm


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f1bf9cdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f1bf9cdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f1bf9cdb

Branch: refs/heads/master
Commit: f1bf9cdb5c7fd7445948f7f3097dbe8ced27d3c0
Parents: 0826b93 8b57097
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Jun 12 16:37:15 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Jun 12 16:37:15 2014 -0400

----------------------------------------------------------------------
 storm-core/pom.xml                              |   7 +
 .../backtype/storm/utils/DisruptorQueue.java    |  70 ++++++---
 .../storm/utils/DisruptorQueueTest.java         | 154 +++++++++++++++++++
 3 files changed, 210 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1bf9cdb/storm-core/pom.xml
----------------------------------------------------------------------


[3/7] git commit: STORM-342: add exception as cause when re-thrown

Posted by pt...@apache.org.
STORM-342: add exception as cause when re-thrown


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8b57097c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8b57097c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8b57097c

Branch: refs/heads/master
Commit: 8b57097c6dc39f267bcbafb671ac977872380a8f
Parents: 6946c34
Author: Sean Zhong <cl...@gmail.com>
Authored: Thu Jun 12 08:53:17 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Thu Jun 12 08:53:17 2014 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8b57097c/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 75ccbbc..932af16 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -74,7 +74,7 @@ public class DisruptorQueue implements IStatefulObject {
             try {
                 publishDirect(FLUSH_CACHE, true);
             } catch (InsufficientCapacityException e) {
-                throw new RuntimeException("This code should be unreachable!");
+                throw new RuntimeException("This code should be unreachable!", e);
             }
         }
     }


[5/7] git commit: fix testMessageDisorder() to assert after calling run()

Posted by pt...@apache.org.
fix testMessageDisorder() to assert after calling run()


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/405dabbc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/405dabbc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/405dabbc

Branch: refs/heads/master
Commit: 405dabbc8e67f72a8c04e95c0b562dc2826c01a1
Parents: f1bf9cd
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Thu Jun 12 16:55:43 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Thu Jun 12 16:55:43 2014 -0400

----------------------------------------------------------------------
 .../test/jvm/backtype/storm/utils/DisruptorQueueTest.java      | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/405dabbc/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index f21b10f..747d95c 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -55,10 +55,10 @@ public class DisruptorQueueTest extends TestCase {
                 }
             }
         });
-        
-        Assert.assertEquals("We expect to receive first published message first, but received " + result[0].toString(), 
-                "1", result[0]);
+
         run(producer, consumer);
+        Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
+                "1", result[0]);
     }
     
     @Test