You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/31 02:23:21 UTC
[6/8] git commit: fix DisruptorQueueTest
fix DisruptorQueueTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/8c4684cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/8c4684cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/8c4684cf
Branch: refs/heads/master
Commit: 8c4684cfca25aa02e815497ad489193bd3d3d495
Parents: 7d50fbe
Author: Boris Aksenov <ak...@corp.finam.ru>
Authored: Sat Jul 5 16:13:45 2014 +0400
Committer: Boris Aksenov <ak...@corp.finam.ru>
Committed: Sat Jul 5 16:13:45 2014 +0400
----------------------------------------------------------------------
.../backtype/storm/utils/DisruptorQueue.java | 1 -
.../storm/utils/DisruptorQueueTest.java | 25 ++++++++++----------
2 files changed, 12 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8c4684cf/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index d495ccf..7c56fe5 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -29,7 +29,6 @@ import com.lmax.disruptor.WaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/8c4684cf/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
index 653fd33..835b4d9 100644
--- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -17,15 +17,15 @@
*/
package backtype.storm.utils;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import com.lmax.disruptor.dsl.ProducerType;
+import junit.framework.TestCase;
import org.junit.Assert;
import org.junit.Test;
-import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicBoolean;
public class DisruptorQueueTest extends TestCase {
@@ -42,7 +42,7 @@ public class DisruptorQueueTest extends TestCase {
Runnable producer = new Producer(queue, "2");
- final Object [] result = new Object[1];
+ final Object[] result = new Object[1];
Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
private boolean head = true;
@@ -60,8 +60,8 @@ public class DisruptorQueueTest extends TestCase {
Assert.assertEquals("We expect to receive first published message first, but received " + result[0],
"1", result[0]);
}
-
- @Test
+
+ @Test
public void testConsumerHang() throws InterruptedException {
final AtomicBoolean messageConsumed = new AtomicBoolean(false);
@@ -91,10 +91,10 @@ public class DisruptorQueueTest extends TestCase {
producerThreads[i] = new Thread(producer);
producerThreads[i].start();
}
-
+
Thread consumerThread = new Thread(consumer);
consumerThread.start();
-
+
for (int i = 0; i < PRODUCER_NUM; i++) {
producerThreads[i].interrupt();
producerThreads[i].join(TIMEOUT);
@@ -122,7 +122,7 @@ public class DisruptorQueueTest extends TestCase {
return;
}
}
- };
+ }
private class Consumer implements Runnable {
private EventHandler handler;
@@ -144,10 +144,9 @@ public class DisruptorQueueTest extends TestCase {
//break
}
}
- };
+ }
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
- queueSize), new BlockingWaitStrategy());
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, new BlockingWaitStrategy());
}
}