You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/09/23 00:23:48 UTC

[23/50] storm git commit: merging from upstream/master

merging from upstream/master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/dc6d559b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/dc6d559b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/dc6d559b

Branch: refs/heads/STORM-1040
Commit: dc6d559b4957898ea96cca3df2910faea0e296a1
Parents: ea868a4 2bb41a9
Author: Arun Mahadevan <ai...@hortonworks.com>
Authored: Sat Sep 19 16:35:37 2015 +0530
Committer: Arun Mahadevan <ai...@hortonworks.com>
Committed: Sat Sep 19 16:35:37 2015 +0530

----------------------------------------------------------------------
 CHANGELOG.md                                    |  17 +-
 DEVELOPER.md                                    |  21 +++
 README.markdown                                 |   4 +
 conf/defaults.yaml                              |   5 +
 docs/about/multi-language.md                    |   2 +-
 .../documentation/Setting-up-a-Storm-cluster.md |  10 +-
 docs/documentation/Tutorial.md                  |   2 +-
 .../trident/TridentEsTopology.java              |   2 +-
 .../trident/OpaqueTridentEventHubSpout.java     |   2 +-
 .../TransactionalTridentEventHubSpout.java      |   2 +-
 .../org/apache/storm/hbase/common/Utils.java    |  10 +-
 .../storm/hdfs/trident/FixedBatchSpout.java     |   2 +-
 .../storm/hive/trident/TridentHiveTopology.java |   2 +-
 external/storm-kafka/README.md                  |  46 +++---
 storm-core/pom.xml                              |   1 +
 storm-core/src/clj/backtype/storm/cluster.clj   |  46 +++++-
 storm-core/src/clj/backtype/storm/config.clj    |  13 +-
 .../backtype/storm/daemon/builtin_metrics.clj   |  25 ++-
 .../src/clj/backtype/storm/daemon/executor.clj  | 154 ++++++++++++-------
 .../src/clj/backtype/storm/daemon/nimbus.clj    |   2 +
 .../clj/backtype/storm/daemon/supervisor.clj    |   3 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |  78 +++++++++-
 storm-core/src/clj/backtype/storm/disruptor.clj |  19 ++-
 storm-core/src/clj/backtype/storm/util.clj      |   4 +
 storm-core/src/jvm/backtype/storm/Config.java   |  23 +++
 .../coordination/BatchSubtopologyBuilder.java   |   4 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |   7 +-
 .../backtype/storm/messaging/netty/Client.java  |  37 +++--
 .../topology/BaseConfigurationDeclarer.java     |   2 +-
 .../ComponentConfigurationDeclarer.java         |   2 +-
 .../storm/topology/TopologyBuilder.java         |   2 +-
 .../TransactionalTopologyBuilder.java           |  10 +-
 .../src/jvm/backtype/storm/tuple/Fields.java    |  21 ++-
 .../src/jvm/backtype/storm/tuple/ITuple.java    | 126 ++++++++++++---
 .../utils/DisruptorBackpressureCallback.java    |  27 ++++
 .../backtype/storm/utils/DisruptorQueue.java    | 101 +++++++++++-
 .../jvm/backtype/storm/utils/RateTracker.java   | 119 ++++++++++++++
 .../storm/utils/WorkerBackpressureCallback.java |  26 ++++
 .../storm/utils/WorkerBackpressureThread.java   |  59 +++++++
 .../storm/trident/spout/BatchSpoutExecutor.java |   2 +-
 .../jvm/storm/trident/spout/IBatchSpout.java    |   2 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |   2 +-
 .../trident/spout/IPartitionedTridentSpout.java |   2 +-
 .../jvm/storm/trident/spout/ITridentSpout.java  |   2 +-
 .../trident/spout/RichSpoutBatchExecutor.java   |   2 +-
 .../storm/trident/testing/FeederBatchSpout.java |   2 +-
 .../testing/FeederCommitterBatchSpout.java      |   2 +-
 .../storm/trident/testing/FixedBatchSpout.java  |   2 +-
 .../topology/TridentTopologyBuilder.java        |  18 +--
 .../test/clj/backtype/storm/config_test.clj     |  28 +++-
 .../test/clj/backtype/storm/supervisor_test.clj |   2 +
 .../utils/DisruptorQueueBackpressureTest.java   | 115 ++++++++++++++
 .../backtype/storm/utils/RateTrackerTest.java   |  62 ++++++++
 53 files changed, 1091 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/executor.clj
index 829a0b4,0683f38..d7a68be
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@@ -504,18 -532,8 +548,9 @@@
          receive-queue (:receive-queue executor-data)
          event-handler (mk-task-receiver executor-data tuple-action-fn)
          has-ackers? (has-ackers? storm-conf)
 +        has-eventloggers? (has-eventloggers? storm-conf)
          emitted-count (MutableLong. 0)
-         empty-emit-streak (MutableLong. 0)
-         
-         ;; the overflow buffer is used to ensure that spouts never block when emitting
-         ;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
-         ;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
-         ;; buffers filled up)
-         ;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
-         ;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple, 
-         ;; preventing memory issues
-         overflow-buffer (ConcurrentLinkedQueue.)]
+         empty-emit-streak (MutableLong. 0)]
     
      [(async-loop
        (fn []
@@@ -710,17 -750,7 +769,8 @@@
                                      (stats/bolt-execute-tuple! executor-stats
                                                                 (.getSourceComponent tuple)
                                                                 (.getSourceStreamId tuple)
 -                                                               delta)))))))]
 +                                                               delta)))))))
- 
-         ;; the overflow buffer is used to ensure that bolts do not block when emitting
-         ;; this ensures that the bolt can always clear the incoming messages, which
-         ;; prevents deadlock from occurs across the topology
-         ;; (e.g. Spout -> BoltA -> Splitter -> BoltB -> BoltA, and all
-         ;; buffers filled up)
-         ;; the overflow buffer is might gradually fill degrading the performance gradually
-         ;; eventually running out of memory, but at least prevent live-locks/deadlocks.
-         overflow-buffer (if (storm-conf TOPOLOGY-BOLTS-OUTGOING-OVERFLOW-BUFFER-ENABLE) (ConcurrentLinkedQueue.) nil)
 +        has-eventloggers? (has-eventloggers? storm-conf)]
      
      ;; TODO: can get any SubscribedState objects out of the context now
  

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index b542348,f795daa..781a959
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -137,11 -170,11 +170,11 @@@
                          (.add remote (TaskMessage. task (.serialize serializer tuple)))
                          (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
                       ))))
-                 (local-transfer local)
-                 (disruptor/publish transfer-queue remoteMap)
-               ))]
+ 
+               (local-transfer local)
+               (disruptor/publish transfer-queue remoteMap)))]
      (if try-serialize-local
 -      (do 
 +      (do
          (log-warn "WILL TRY TO SERIALIZE ALL TUPLES (Turn off " TOPOLOGY-TESTING-ALWAYS-TRY-SERIALIZE " for production)")
          (fn [^KryoTupleSerializer serializer tuple-batch]
            (assert-can-serialize serializer tuple-batch)

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/clj/backtype/storm/util.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/dc6d559b/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------