You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/31 02:23:21 UTC

[6/8] git commit: fix DisruptorQueueTest

fix DisruptorQueueTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8c4684cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8c4684cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8c4684cf

Branch: refs/heads/master
Commit: 8c4684cfca25aa02e815497ad489193bd3d3d495
Parents: 7d50fbe
Author: Boris Aksenov <ak...@corp.finam.ru>
Authored: Sat Jul 5 16:13:45 2014 +0400
Committer: Boris Aksenov <ak...@corp.finam.ru>
Committed: Sat Jul 5 16:13:45 2014 +0400

----------------------------------------------------------------------
 .../backtype/storm/utils/DisruptorQueue.java    |  1 -
 .../storm/utils/DisruptorQueueTest.java         | 25 ++++++++++----------
 2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8c4684cf/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index d495ccf..7c56fe5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -29,7 +29,6 @@ import com.lmax.disruptor.WaitStrategy;
 import com.lmax.disruptor.dsl.ProducerType;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.HashMap;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8c4684cf/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 653fd33..835b4d9 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -17,15 +17,15 @@
  */
 package backtype.storm.utils;
 
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import com.lmax.disruptor.BlockingWaitStrategy;
 import com.lmax.disruptor.EventHandler;
 import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import com.lmax.disruptor.dsl.ProducerType;
+import junit.framework.TestCase;
 import org.junit.Assert;
 import org.junit.Test;
-import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class DisruptorQueueTest extends TestCase {
 
@@ -42,7 +42,7 @@ public class DisruptorQueueTest extends TestCase {
 
         Runnable producer = new Producer(queue, "2");
 
-        final Object [] result = new Object[1];
+        final Object[] result = new Object[1];
         Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
             private boolean head = true;
 
@@ -60,8 +60,8 @@ public class DisruptorQueueTest extends TestCase {
         Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
                 "1", result[0]);
     }
-    
-    @Test 
+
+    @Test
     public void testConsumerHang() throws InterruptedException {
         final AtomicBoolean messageConsumed = new AtomicBoolean(false);
 
@@ -91,10 +91,10 @@ public class DisruptorQueueTest extends TestCase {
             producerThreads[i] = new Thread(producer);
             producerThreads[i].start();
         }
-        
+
         Thread consumerThread = new Thread(consumer);
         consumerThread.start();
-                
+
         for (int i = 0; i < PRODUCER_NUM; i++) {
             producerThreads[i].interrupt();
             producerThreads[i].join(TIMEOUT);
@@ -122,7 +122,7 @@ public class DisruptorQueueTest extends TestCase {
                 return;
             }
         }
-    };
+    }
 
     private class Consumer implements Runnable {
         private EventHandler handler;
@@ -144,10 +144,9 @@ public class DisruptorQueueTest extends TestCase {
                 //break
             }
         }
-    };
+    }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
-                queueSize), new BlockingWaitStrategy());
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, new BlockingWaitStrategy());
     }
 }