You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/07/31 02:23:19 UTC
[4/8] git commit: merge
merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b47932d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b47932d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b47932d8
Branch: refs/heads/master
Commit: b47932d899917a8a2d1a5d9a671a448b147a7432
Parents: 7d5b348 8da9572
Author: Boris Aksenov <ak...@corp.finam.ru>
Authored: Sat Jul 5 16:02:32 2014 +0400
Committer: Boris Aksenov <ak...@corp.finam.ru>
Committed: Sat Jul 5 16:02:32 2014 +0400
----------------------------------------------------------------------
CHANGELOG.md | 20 +
DEVELOPER.md | 30 +
LICENSE | 39 +-
README.markdown | 36 +-
STORM-UI-REST-API.md | 546 +++++++++++++
bin/storm | 65 +-
conf/defaults.yaml | 2 +
conf/storm_env.ini | 2 +-
examples/storm-starter/README.markdown | 27 +-
.../src/jvm/storm/starter/RollingTopWords.java | 62 +-
.../src/jvm/storm/starter/util/StormRunner.java | 8 +
external/storm-kafka/README.md | 33 +-
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 9 +-
.../src/jvm/storm/kafka/Partition.java | 9 +-
.../src/jvm/storm/kafka/PartitionManager.java | 2 +-
.../storm/kafka/DynamicBrokersReaderTest.java | 17 +
.../src/test/storm/kafka/KafkaErrorTest.java | 17 +
.../src/test/storm/kafka/KafkaTestBroker.java | 17 +
.../src/test/storm/kafka/KafkaUtilsTest.java | 17 +
.../storm/kafka/StringKeyValueSchemeTest.java | 17 +
.../src/test/storm/kafka/TestUtils.java | 17 +
.../src/test/storm/kafka/ZkCoordinatorTest.java | 17 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 17 +
pom.xml | 17 +-
storm-core/pom.xml | 7 +
.../src/clj/backtype/storm/LocalCluster.clj | 76 +-
storm-core/src/clj/backtype/storm/LocalDRPC.clj | 5 +-
storm-core/src/clj/backtype/storm/bootstrap.clj | 9 +-
storm-core/src/clj/backtype/storm/clojure.clj | 10 +-
storm-core/src/clj/backtype/storm/cluster.clj | 431 ++++++-----
storm-core/src/clj/backtype/storm/config.clj | 145 ++--
.../src/clj/backtype/storm/daemon/common.clj | 2 +-
.../src/clj/backtype/storm/daemon/drpc.clj | 134 ++--
.../src/clj/backtype/storm/daemon/executor.clj | 3 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 181 +++--
.../src/clj/backtype/storm/daemon/nimbus.clj | 10 +-
.../clj/backtype/storm/daemon/supervisor.clj | 25 +-
.../src/clj/backtype/storm/daemon/worker.clj | 11 +-
storm-core/src/clj/backtype/storm/disruptor.clj | 77 +-
storm-core/src/clj/backtype/storm/event.clj | 49 +-
storm-core/src/clj/backtype/storm/log.clj | 22 +-
.../clj/backtype/storm/process_simulator.clj | 27 +-
storm-core/src/clj/backtype/storm/stats.clj | 289 +++----
storm-core/src/clj/backtype/storm/testing.clj | 546 ++++++-------
storm-core/src/clj/backtype/storm/testing4j.clj | 58 +-
storm-core/src/clj/backtype/storm/thrift.clj | 255 +++---
storm-core/src/clj/backtype/storm/timer.clj | 106 +--
storm-core/src/clj/backtype/storm/tuple.clj | 7 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 605 ++++++++-------
.../src/clj/backtype/storm/ui/helpers.clj | 5 +
storm-core/src/clj/backtype/storm/util.clj | 767 ++++++++++---------
storm-core/src/clj/backtype/storm/zookeeper.clj | 158 ++--
.../src/dev/resources/tester_bolt_metrics.py | 35 +
.../src/dev/resources/tester_spout_metrics.py | 51 ++
storm-core/src/jvm/backtype/storm/Config.java | 29 +
.../jvm/backtype/storm/generated/ErrorInfo.java | 188 ++++-
.../netty/NettyRenameThreadFactory.java | 19 +-
.../metric/api/rpc/AssignableShellMetric.java | 30 +
.../metric/api/rpc/CombinedShellMetric.java | 31 +
.../storm/metric/api/rpc/CountShellMetric.java | 38 +
.../storm/metric/api/rpc/IShellMetric.java | 31 +
.../metric/api/rpc/ReducedShellMetric.java | 32 +
.../jvm/backtype/storm/multilang/BoltMsg.java | 17 +
.../backtype/storm/multilang/ISerializer.java | 17 +
.../storm/multilang/JsonSerializer.java | 27 +
.../storm/multilang/NoOutputException.java | 17 +
.../jvm/backtype/storm/multilang/ShellMsg.java | 37 +
.../jvm/backtype/storm/multilang/SpoutMsg.java | 17 +
.../jvm/backtype/storm/spout/ShellSpout.java | 44 +-
.../src/jvm/backtype/storm/task/ShellBolt.java | 47 +-
.../backtype/storm/task/TopologyContext.java | 35 +
.../storm/testing/PythonShellMetricsBolt.java | 32 +
.../storm/testing/PythonShellMetricsSpout.java | 35 +
.../storm/ui/InvalidRequestException.java | 20 +
.../backtype/storm/utils/DisruptorQueue.java | 100 ++-
.../jvm/backtype/storm/utils/ShellProcess.java | 46 +-
.../trident/state/map/RemovableMapState.java | 17 +
.../topology/MasterBatchCoordinator.java | 4 +
storm-core/src/multilang/py/storm.py | 3 +
storm-core/src/py/storm/ttypes.py | 28 +-
storm-core/src/storm.thrift | 2 +
storm-core/src/ui/public/component.html | 38 +-
storm-core/src/ui/public/css/style.css | 1 +
storm-core/src/ui/public/index.html | 16 +
storm-core/src/ui/public/js/visualization.js | 20 +-
.../templates/component-page-template.html | 27 +-
.../public/templates/index-page-template.html | 16 +
.../public/templates/json-error-template.html | 16 +
.../templates/topology-page-template.html | 24 +-
storm-core/src/ui/public/topology.html | 21 +-
.../test/clj/backtype/storm/cluster_test.clj | 8 +-
.../test/clj/backtype/storm/metrics_test.clj | 206 +++--
.../test/clj/backtype/storm/supervisor_test.clj | 45 +-
.../storm/utils/DisruptorQueueTest.java | 153 ++++
storm-dist/binary/LICENSE | 39 +-
95 files changed, 4731 insertions(+), 1991 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/pom.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/disruptor.clj
index 72bf0a7,a723601..2bbeebd
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@@ -13,20 -13,20 +13,20 @@@
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
+
(ns backtype.storm.disruptor
(:import [backtype.storm.utils DisruptorQueue])
- (:import [com.lmax.disruptor BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy BusySpinWaitStrategy])
- (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy
- BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
++ (:import [com.lmax.disruptor BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy
+ BusySpinWaitStrategy])
+ (:import [com.lmax.disruptor.dsl ProducerType])
(:require [clojure [string :as str]])
(:require [clojure [set :as set]])
(:use [clojure walk])
- (:use [backtype.storm util log])
- )
+ (:use [backtype.storm util log]))
(def CLAIM-STRATEGY
- {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size)))
- :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))})
+ {:multi-threaded (ProducerType/MULTI)
- :single-threaded (ProducerType/SINGLE)
- })
++ :single-threaded (ProducerType/SINGLE)})
(def WAIT-STRATEGY
{:block (fn [] (BlockingWaitStrategy.))
@@@ -44,22 -42,23 +42,24 @@@
;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue.
;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
- ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
+ ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue,
;; unblocking the consumer
- (defnk disruptor-queue [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+ (defnk disruptor-queue
+ [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
(DisruptorQueue. queue-name
- ((CLAIM-STRATEGY claim-strategy) buffer-size)
+ (CLAIM-STRATEGY claim-strategy)
+ buffer-size
- (mk-wait-strategy wait-strategy)
- ))
+ (mk-wait-strategy wait-strategy)))
- (defn clojure-handler [afn]
+ (defn clojure-handler
+ [afn]
(reify com.lmax.disruptor.EventHandler
- (onEvent [this o seq-id batchEnd?]
- (afn o seq-id batchEnd?)
- )))
+ (onEvent
+ [this o seq-id batchEnd?]
+ (afn o seq-id batchEnd?))))
- (defmacro handler [& args]
+ (defmacro handler
+ [& args]
`(clojure-handler (fn ~@args)))
(defn publish
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b47932d8/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index df2545f,932af16..25bfcf8
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@@ -24,48 -25,64 +24,66 @@@ import com.lmax.disruptor.InsufficientC
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.SingleThreadedClaimStrategy;
+import com.lmax.disruptor.TimeoutException;
import com.lmax.disruptor.WaitStrategy;
+import com.lmax.disruptor.dsl.ProducerType;
+
import java.util.concurrent.ConcurrentLinkedQueue;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.HashMap;
-import java.util.Map;
import backtype.storm.metric.api.IStatefulObject;
-
/**
*
* A single consumer queue that uses the LMAX Disruptor. They key to the performance is
* the ability to catch up to the producer by processing tuples in batches.
*/
public class DisruptorQueue implements IStatefulObject {
- static final Object FLUSH_CACHE = new Object();
- static final Object INTERRUPT = new Object();
-
- RingBuffer<MutableObject> _buffer;
- Sequence _consumer;
- SequenceBarrier _barrier;
+ private static final Object FLUSH_CACHE = new Object();
+ private static final Object INTERRUPT = new Object();
+ private static final String PREFIX = "disruptor-";
+
- private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
- private final HashMap<String, Object> state = new HashMap<String, Object>(4);
+
+
+ private final String _queueName;
+ private final RingBuffer<MutableObject> _buffer;
+ private final Sequence _consumer;
+ private final SequenceBarrier _barrier;
+
// TODO: consider having a threadlocal cache of this variable to speed up reads?
volatile boolean consumerStartedFlag = false;
-
- ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
++ private final HashMap<String, Object> state = new HashMap<String, Object>(4);
++
++ private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
++
+
+ private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
+ private final Lock readLock = cacheLock.readLock();
+ private final Lock writeLock = cacheLock.writeLock();
-
- private static String PREFIX = "disruptor-";
- private String _queueName = "";
-
- public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
- this._queueName = PREFIX + queueName;
- _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
+ public DisruptorQueue(String queueName, ProducerType producerType, int bufferSize, WaitStrategy wait) {
++ this._queueName = PREFIX + queueName;
+ _buffer = RingBuffer.create(producerType, new ObjectEventFactory(), bufferSize, wait);
- _queueName = PREFIX + queueName;
_consumer = new Sequence();
_barrier = _buffer.newBarrier();
- _buffer.setGatingSequences(_consumer);
- if(claim instanceof SingleThreadedClaimStrategy) {
+ _buffer.addGatingSequences(_consumer);
- consumerStartedFlag = producerType == ProducerType.SINGLE;
++ if(producerType == ProducerType.SINGLE) {
+ consumerStartedFlag = true;
+ } else {
+ // make sure we flush the pending messages in cache first
+ try {
+ publishDirect(FLUSH_CACHE, true);
+ } catch (InsufficientCapacityException e) {
+ throw new RuntimeException("This code should be unreachable!", e);
+ }
+ }
}
-
+
public String getName() {
- return _queueName;
+ return _queueName;
}
-
-
+
public void consumeBatch(EventHandler<Object> handler) {
consumeBatchToCursor(_barrier.getCursor(), handler);
}
@@@ -85,12 -102,10 +103,12 @@@
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
+ } catch (TimeoutException e) {
+ throw new RuntimeException(e);
}
}
-
-
+
+
private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
for(long curr = _consumer.get() + 1; curr <= cursor; curr++) {
try {
@@@ -131,35 -146,49 +149,49 @@@
public void tryPublish(Object obj) throws InsufficientCapacityException {
publish(obj, false);
}
-
+
public void publish(Object obj, boolean block) throws InsufficientCapacityException {
- if(consumerStartedFlag) {
- final long id;
- if(block) {
- id = _buffer.next();
- } else {
- id = _buffer.tryNext(1);
+
+ boolean publishNow = consumerStartedFlag;
+
+ if (!publishNow) {
+ readLock.lock();
+ try {
+ publishNow = consumerStartedFlag;
+ if (!publishNow) {
+ _cache.add(obj);
+ }
+ } finally {
+ readLock.unlock();
}
- final MutableObject m = _buffer.get(id);
- m.setObject(obj);
- _buffer.publish(id);
- } else {
- _cache.add(obj);
- if(consumerStartedFlag) flushCache();
+ }
+
+ if (publishNow) {
+ publishDirect(obj, block);
}
}
-
- public void consumerStarted() {
- if(!consumerStartedFlag) {
- consumerStartedFlag = true;
- flushCache();
+
+ private void publishDirect(Object obj, boolean block) throws InsufficientCapacityException {
+ final long id;
+ if(block) {
+ id = _buffer.next();
+ } else {
+ id = _buffer.tryNext(1);
}
+ final MutableObject m = _buffer.get(id);
+ m.setObject(obj);
+ _buffer.publish(id);
}
+
+ public void consumerStarted() {
- private void flushCache() {
- publish(FLUSH_CACHE);
+ consumerStartedFlag = true;
+
+ // Use writeLock to make sure all pending cache add opearation completed
+ writeLock.lock();
+ writeLock.unlock();
}
-
+
public long population() { return (writePos() - readPos()); }
public long capacity() { return _buffer.getBufferSize(); }
public long writePos() { return _buffer.getCursor(); }