You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/23 00:23:48 UTC
[23/50] storm git commit: merging from upstream/master
merging from upstream/master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc6d559b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc6d559b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc6d559b
Branch: refs/heads/STORM-1040
Commit: dc6d559b4957898ea96cca3df2910faea0e296a1
Parents: ea868a4 2bb41a9
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Sat Sep 19 16:35:37 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Sat Sep 19 16:35:37 2015 +0530
----------------------------------------------------------------------
CHANGELOG.md | 17 +-
DEVELOPER.md | 21 +++
README.markdown | 4 +
conf/defaults.yaml | 5 +
docs/about/multi-language.md | 2 +-
.../documentation/Setting-up-a-Storm-cluster.md | 10 +-
docs/documentation/Tutorial.md | 2 +-
.../trident/TridentEsTopology.java | 2 +-
.../trident/OpaqueTridentEventHubSpout.java | 2 +-
.../TransactionalTridentEventHubSpout.java | 2 +-
.../org/apache/storm/hbase/common/Utils.java | 10 +-
.../storm/hdfs/trident/FixedBatchSpout.java | 2 +-
.../storm/hive/trident/TridentHiveTopology.java | 2 +-
external/storm-kafka/README.md | 46 +++---
storm-core/pom.xml | 1 +
storm-core/src/clj/backtype/storm/cluster.clj | 46 +++++-
storm-core/src/clj/backtype/storm/config.clj | 13 +-
.../backtype/storm/daemon/builtin_metrics.clj | 25 ++-
.../src/clj/backtype/storm/daemon/executor.clj | 154 ++++++++++++-------
.../src/clj/backtype/storm/daemon/nimbus.clj | 2 +
.../clj/backtype/storm/daemon/supervisor.clj | 3 +-
.../src/clj/backtype/storm/daemon/worker.clj | 78 +++++++++-
storm-core/src/clj/backtype/storm/disruptor.clj | 19 ++-
storm-core/src/clj/backtype/storm/util.clj | 4 +
storm-core/src/jvm/backtype/storm/Config.java | 23 +++
.../coordination/BatchSubtopologyBuilder.java | 4 +-
.../storm/drpc/LinearDRPCTopologyBuilder.java | 7 +-
.../backtype/storm/messaging/netty/Client.java | 37 +++--
.../topology/BaseConfigurationDeclarer.java | 2 +-
.../ComponentConfigurationDeclarer.java | 2 +-
.../storm/topology/TopologyBuilder.java | 2 +-
.../TransactionalTopologyBuilder.java | 10 +-
.../src/jvm/backtype/storm/tuple/Fields.java | 21 ++-
.../src/jvm/backtype/storm/tuple/ITuple.java | 126 ++++++++++++---
.../utils/DisruptorBackpressureCallback.java | 27 ++++
.../backtype/storm/utils/DisruptorQueue.java | 101 +++++++++++-
.../jvm/backtype/storm/utils/RateTracker.java | 119 ++++++++++++++
.../storm/utils/WorkerBackpressureCallback.java | 26 ++++
.../storm/utils/WorkerBackpressureThread.java | 59 +++++++
.../storm/trident/spout/BatchSpoutExecutor.java | 2 +-
.../jvm/storm/trident/spout/IBatchSpout.java | 2 +-
.../spout/IOpaquePartitionedTridentSpout.java | 2 +-
.../trident/spout/IPartitionedTridentSpout.java | 2 +-
.../jvm/storm/trident/spout/ITridentSpout.java | 2 +-
.../trident/spout/RichSpoutBatchExecutor.java | 2 +-
.../storm/trident/testing/FeederBatchSpout.java | 2 +-
.../testing/FeederCommitterBatchSpout.java | 2 +-
.../storm/trident/testing/FixedBatchSpout.java | 2 +-
.../topology/TridentTopologyBuilder.java | 18 +--
.../test/clj/backtype/storm/config_test.clj | 28 +++-
.../test/clj/backtype/storm/supervisor_test.clj | 2 +
.../utils/DisruptorQueueBackpressureTest.java | 115 ++++++++++++++
.../backtype/storm/utils/RateTrackerTest.java | 62 ++++++++
53 files changed, 1091 insertions(+), 190 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/conf/defaults.yaml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/executor.clj
index 829a0b4,0683f38..d7a68be
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@@ -504,18 -532,8 +548,9 @@@
receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)
has-ackers? (has-ackers? storm-conf)
+ has-eventloggers? (has-eventloggers? storm-conf)
emitted-count (MutableLong. 0)
- empty-emit-streak (MutableLong. 0)
-
- ;; the overflow buffer is used to ensure that spouts never block when emitting
- ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
- ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
- ;; buffers filled up)
- ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
- ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
- ;; preventing memory issues
- overflow-buffer (ConcurrentLinkedQueue.)]
+ empty-emit-streak (MutableLong. 0)]
[(async-loop
(fn []
@@@ -710,17 -750,7 +769,8 @@@
(stats/bolt-execute-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)))))))]
+ delta)))))))
-
- ;; the overflow buffer is used to ensure that bolts do not block when emitting
- ;; this ensures that the bolt can always clear the incoming messages, which
- ;; prevents deadlock from occurs across the topology
- ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
- ;; buffers filled up)
- ;; the overflow buffer is might gradually fill degrading the performance gradually
- ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
- overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)
+ has-eventloggers? (has-eventloggers? storm-conf)]
;; TODO: can get any SubscribedState objects out of the context now
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index b542348,f795daa..781a959
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -137,11 -170,11 +170,11 @@@
(.add remote (TaskMessage. task (.serialize serializer tuple)))
(log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
))))
- (local-transfer local)
- (disruptor/publish transfer-queue remoteMap)
- ))]
+
+ (local-transfer local)
+ (disruptor/publish transfer-queue remoteMap)))]
(if try-serialize-local
- (do
+ (do
(log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
(fn [^KryoTupleSerializer serializer tuple-batch]
(assert-can-serialize serializer tuple-batch)
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------