You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/06/19 23:06:13 UTC
[02/23] git commit: STORM-342: Message loss, executor hang,
or message disorder due to contention in Disruptor queue under
multi-thread mode.
STORM-342: Message loss, executor hang, or message disorder due to contention in Disruptor queue under multi-thread mode.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/72b1f592
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/72b1f592
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/72b1f592
Branch: refs/heads/security
Commit: 72b1f592885abc8c02c6902aa0eb6499bacae7f2
Parents: c89fb82
Author: Sean Zhong <cl...@gmail.com>
Authored: Tue Jun 10 19:54:11 2014 +0800
Committer: Sean Zhong <cl...@gmail.com>
Committed: Tue Jun 10 19:54:11 2014 +0800
----------------------------------------------------------------------
storm-core/pom.xml | 7 +
.../backtype/storm/utils/DisruptorQueue.java | 70 ++++++---
.../storm/utils/DisruptorQueueTest.java | 154 +++++++++++++++++++
3 files changed, 210 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index fec6218..26f08cb 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -176,9 +176,16 @@
<artifactId>conjure</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.1</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<sourceDirectory>src/jvm</sourceDirectory>
+ <testSourceDirectory>test/jvm</testSourceDirectory>
<resources>
<resource>
<directory>../conf</directory>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 8c5b466..0068964 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -27,13 +27,15 @@ import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy;
+
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+
/**
*
@@ -51,6 +53,11 @@ public class DisruptorQueue implements IStatefulObject {
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+
+ private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+ private final Lock cacheReadLock = cacheLock.readLock();
+ private final Lock cacheWriteLock = cacheLock.writeLock();
+
private static String PREFIX = "disruptor-";
private String _queueName = "";
@@ -62,6 +69,13 @@ public class DisruptorQueue implements IStatefulObject {
_buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
+ } else {
+ // make sure we flush the pending messages in cache first
+ try {
+ publishDirect(FLUSH_CACHE, true);
+ } catch (InsufficientCapacityException e) {
+ throw new RuntimeException("This code should be unreachable!");
+ }
}
}
@@ -134,33 +148,47 @@ public class DisruptorQueue implements IStatefulObject {
}
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
- if(consumerStartedFlag) {
- final long id;
- if(block) {
- id = _buffer.next();
- } else {
- id = _buffer.tryNext(1);
+
+ boolean publishNow = consumerStartedFlag;
+
+ if (!publishNow) {
+ cacheReadLock.lock();
+ try {
+ publishNow = consumerStartedFlag;
+ if (!publishNow) {
+ _cache.add(obj);
+ }
+ } finally {
+ cacheReadLock.unlock();
}
- final MutableObject m = _buffer.get(id);
- m.setObject(obj);
- _buffer.publish(id);
- } else {
- _cache.add(obj);
- if(consumerStartedFlag) flushCache();
+ }
+
+ if (publishNow) {
+ publishDirect(obj, block);
}
}
- public void consumerStarted() {
- if(!consumerStartedFlag) {
- consumerStartedFlag = true;
- flushCache();
+ private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+ final long id;
+ if(block) {
+ id = _buffer.next();
+ } else {
+ id = _buffer.tryNext(1);
}
+ final MutableObject m = _buffer.get(id);
+ m.setObject(obj);
+ _buffer.publish(id);
}
- private void flushCache() {
- publish(FLUSH_CACHE);
- }
+ public void consumerStarted() {
+ consumerStartedFlag = true;
+
+ // Use writeLock to make sure all pending cache add opearation completed
+ cacheWriteLock.lock();
+ cacheWriteLock.unlock();
+ }
+
public long population() { return (writePos() - readPos()); }
public long capacity() { return _buffer.getBufferSize(); }
public long writePos() { return _buffer.getCursor(); }
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/72b1f592/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
new file mode 100644
index 0000000..f21b10f
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.InsufficientCapacityException;
+import com.lmax.disruptor.MultiThreadedClaimStrategy;
+import org.junit.Assert;
+import org.junit.Test;
+import junit.framework.TestCase;
+
+public class DisruptorQueueTest extends TestCase {
+
+ private final static int TIMEOUT = 5; // MS
+ private final static int PRODUCER_NUM = 4;
+
+ @Test
+ public void testMessageDisorder() throws InterruptedException {
+
+ // Set queue length to bigger enough
+ DisruptorQueue queue = createQueue("messageOrder", 16);
+
+ queue.publish("1");
+
+ Runnable producer = new Producer(queue, "2");
+
+ final Object [] result = new Object[1];
+ Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+ private boolean head = true;
+
+ @Override
+ public void onEvent(Object obj, long sequence, boolean endOfBatch)
+ throws Exception {
+ if (head) {
+ head = false;
+ result[0] = obj;
+ }
+ }
+ });
+
+ Assert.assertEquals("We expect to receive first published message first, but received " + result[0].toString(),
+ "1", result[0]);
+ run(producer, consumer);
+ }
+
+ @Test
+ public void testConsumerHang() throws InterruptedException {
+ final AtomicBoolean messageConsumed = new AtomicBoolean(false);
+
+ // Set queue length to 1, so that the RingBuffer can be easily full
+ // to trigger consumer blocking
+ DisruptorQueue queue = createQueue("consumerHang", 1);
+ Runnable producer = new Producer(queue, "msg");
+ Runnable consumer = new Consumer(queue, new EventHandler<Object>() {
+ @Override
+ public void onEvent(Object obj, long sequence, boolean endOfBatch)
+ throws Exception {
+ messageConsumed.set(true);
+ }
+ });
+
+ run(producer, consumer);
+ Assert.assertTrue("disruptor message is never consumed due to consumer thread hangs",
+ messageConsumed.get());
+ }
+
+
+ private void run(Runnable producer, Runnable consumer)
+ throws InterruptedException {
+
+ Thread[] producerThreads = new Thread[PRODUCER_NUM];
+ for (int i = 0; i < PRODUCER_NUM; i++) {
+ producerThreads[i] = new Thread(producer);
+ producerThreads[i].start();
+ }
+
+ Thread consumerThread = new Thread(consumer);
+ consumerThread.start();
+
+ for (int i = 0; i < PRODUCER_NUM; i++) {
+ producerThreads[i].interrupt();
+ producerThreads[i].join(TIMEOUT);
+ }
+ consumerThread.interrupt();
+ consumerThread.join(TIMEOUT);
+ //consumerThread.stop();
+ }
+
+ private class Producer implements Runnable {
+ private String msg;
+ private DisruptorQueue queue;
+
+ Producer(DisruptorQueue queue, String msg) {
+ this.msg = msg;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ queue.publish(msg, false);
+ }
+ } catch (InsufficientCapacityException e) {
+ return;
+ }
+ }
+ };
+
+ private class Consumer implements Runnable {
+ private EventHandler handler;
+ private DisruptorQueue queue;
+
+ Consumer(DisruptorQueue queue, EventHandler handler) {
+ this.handler = handler;
+ this.queue = queue;
+ }
+
+ @Override
+ public void run() {
+ queue.consumerStarted();
+ try {
+ while(true) {
+ queue.consumeBatchWhenAvailable(handler);
+ }
+ }catch(RuntimeException e) {
+ //break
+ }
+ }
+ };
+
+ private static DisruptorQueue createQueue(String name, int queueSize) {
+ return new DisruptorQueue(name, new MultiThreadedClaimStrategy(
+ queueSize), new BlockingWaitStrategy());
+ }
+}