You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/20 17:24:52 UTC

[2/5] git commit: Merge branch 'apache-master' into jni

Merge branch 'apache-master' into jni

Conflicts:
	storm-core/src/clj/backtype/storm/daemon/supervisor.clj


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/0cce4b82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/0cce4b82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/0cce4b82

Branch: refs/heads/master
Commit: 0cce4b8254d7fec9565bb94ea48fb7afec540cd2
Parents: 486ecc8 dc4de42
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Mon Apr 28 14:54:50 2014 +0000
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Mon Apr 28 14:54:50 2014 +0000

----------------------------------------------------------------------
 CHANGELOG.md                                    |  31 ++
 NOTICE                                          |   9 +-
 README.markdown                                 |   5 +-
 bin/storm                                       |  22 +-
 bin/storm-config.cmd                            |  30 +-
 bin/storm.cmd                                   |  21 +-
 examples/storm-starter/README.markdown          | 108 ++++++
 .../multilang/resources/splitsentence.py        |  24 ++
 .../multilang/resources/splitsentence.rb        |  26 ++
 .../storm-starter/multilang/resources/storm.py  | 221 +++++++++++
 .../storm-starter/multilang/resources/storm.rb  | 200 ++++++++++
 examples/storm-starter/pom.xml                  | 161 ++++++++
 .../src/clj/storm/starter/clj/word_count.clj    |  95 +++++
 .../jvm/storm/starter/BasicDRPCTopology.java    |  78 ++++
 .../jvm/storm/starter/ExclamationTopology.java  |  87 +++++
 .../src/jvm/storm/starter/ManualDRPC.java       |  68 ++++
 .../jvm/storm/starter/PrintSampleStream.java    |  54 +++
 .../src/jvm/storm/starter/ReachTopology.java    | 196 ++++++++++
 .../src/jvm/storm/starter/RollingTopWords.java  |  78 ++++
 .../jvm/storm/starter/SingleJoinExample.java    |  64 ++++
 .../storm/starter/TransactionalGlobalCount.java | 173 +++++++++
 .../jvm/storm/starter/TransactionalWords.java   | 246 +++++++++++++
 .../jvm/storm/starter/WordCountTopology.java    | 107 ++++++
 .../storm/starter/bolt/AbstractRankerBolt.java  | 110 ++++++
 .../starter/bolt/IntermediateRankingsBolt.java  |  58 +++
 .../src/jvm/storm/starter/bolt/PrinterBolt.java |  37 ++
 .../storm/starter/bolt/RollingCountBolt.java    | 142 +++++++
 .../jvm/storm/starter/bolt/SingleJoinBolt.java  | 114 ++++++
 .../storm/starter/bolt/TotalRankingsBolt.java   |  59 +++
 .../starter/spout/RandomSentenceSpout.java      |  64 ++++
 .../storm/starter/spout/TwitterSampleSpout.java | 122 ++++++
 .../tools/NthLastModifiedTimeTracker.java       |  70 ++++
 .../src/jvm/storm/starter/tools/Rankable.java   |  32 ++
 .../starter/tools/RankableObjectWithFields.java | 148 ++++++++
 .../src/jvm/storm/starter/tools/Rankings.java   | 156 ++++++++
 .../starter/tools/SlidingWindowCounter.java     | 119 ++++++
 .../storm/starter/tools/SlotBasedCounter.java   | 118 ++++++
 .../jvm/storm/starter/trident/TridentReach.java | 156 ++++++++
 .../storm/starter/trident/TridentWordCount.java |  85 +++++
 .../src/jvm/storm/starter/util/StormRunner.java |  39 ++
 .../jvm/storm/starter/util/TupleHelpers.java    |  33 ++
 .../bolt/IntermediateRankingsBoltTest.java      | 146 ++++++++
 .../starter/bolt/RollingCountBoltTest.java      | 113 ++++++
 .../starter/bolt/TotalRankingsBoltTest.java     | 147 ++++++++
 .../storm/starter/tools/MockTupleHelpers.java   |  40 ++
 .../tools/NthLastModifiedTimeTrackerTest.java   | 125 +++++++
 .../tools/RankableObjectWithFieldsTest.java     | 252 +++++++++++++
 .../jvm/storm/starter/tools/RankingsTest.java   | 368 +++++++++++++++++++
 .../starter/tools/SlidingWindowCounterTest.java | 106 ++++++
 .../starter/tools/SlotBasedCounterTest.java     | 181 +++++++++
 external/storm-kafka/CHANGELOG.md               |  13 +
 external/storm-kafka/README.md                  |  25 ++
 external/storm-kafka/pom.xml                    | 138 +++++++
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |  80 ++++
 .../src/jvm/storm/kafka/BrokerHosts.java        |  25 ++
 .../jvm/storm/kafka/DynamicBrokersReader.java   | 145 ++++++++
 .../kafka/DynamicPartitionConnections.java      |  94 +++++
 .../jvm/storm/kafka/FailedFetchException.java   |  29 ++
 .../src/jvm/storm/kafka/KafkaConfig.java        |  50 +++
 .../src/jvm/storm/kafka/KafkaError.java         |  43 +++
 .../src/jvm/storm/kafka/KafkaSpout.java         | 190 ++++++++++
 .../src/jvm/storm/kafka/KafkaUtils.java         | 235 ++++++++++++
 .../src/jvm/storm/kafka/KeyValueScheme.java     |  28 ++
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  36 ++
 .../src/jvm/storm/kafka/Partition.java          |  64 ++++
 .../jvm/storm/kafka/PartitionCoordinator.java   |  26 ++
 .../src/jvm/storm/kafka/PartitionManager.java   | 241 ++++++++++++
 .../src/jvm/storm/kafka/SpoutConfig.java        |  36 ++
 .../src/jvm/storm/kafka/StaticCoordinator.java  |  48 +++
 .../src/jvm/storm/kafka/StaticHosts.java        |  38 ++
 .../storm/kafka/StaticPartitionConnections.java |  52 +++
 .../jvm/storm/kafka/StringKeyValueScheme.java   |  37 ++
 .../src/jvm/storm/kafka/StringScheme.java       |  46 +++
 .../src/jvm/storm/kafka/ZkCoordinator.java      | 112 ++++++
 .../src/jvm/storm/kafka/ZkHosts.java            |  36 ++
 .../src/jvm/storm/kafka/ZkState.java            | 116 ++++++
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  89 +++++
 .../jvm/storm/kafka/trident/Coordinator.java    |  50 +++
 .../storm/kafka/trident/DefaultCoordinator.java |  31 ++
 .../trident/GlobalPartitionInformation.java     |  99 +++++
 .../storm/kafka/trident/IBatchCoordinator.java  |  26 ++
 .../jvm/storm/kafka/trident/IBrokerReader.java  |  25 ++
 .../src/jvm/storm/kafka/trident/MaxMetric.java  |  40 ++
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |  59 +++
 .../storm/kafka/trident/StaticBrokerReader.java |  36 ++
 .../trident/TransactionalTridentKafkaSpout.java |  58 +++
 .../storm/kafka/trident/TridentKafkaConfig.java |  37 ++
 .../kafka/trident/TridentKafkaEmitter.java      | 269 ++++++++++++++
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |  62 ++++
 .../storm/kafka/DynamicBrokersReaderTest.java   | 155 ++++++++
 .../src/test/storm/kafka/KafkaErrorTest.java    |  39 ++
 .../src/test/storm/kafka/KafkaTestBroker.java   |  60 +++
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 221 +++++++++++
 .../storm/kafka/StringKeyValueSchemeTest.java   |  38 ++
 .../src/test/storm/kafka/TestUtils.java         |  20 +
 .../src/test/storm/kafka/ZkCoordinatorTest.java | 130 +++++++
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 171 +++++++++
 pom.xml                                         |  65 +++-
 storm-core/pom.xml                              |  50 ++-
 storm-core/src/clj/backtype/storm/config.clj    |   2 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   6 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |  49 ++-
 .../src/clj/backtype/storm/daemon/worker.clj    |   2 +-
 storm-core/src/clj/backtype/storm/timer.clj     |   2 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  27 +-
 .../src/clj/backtype/storm/ui/helpers.clj       |   4 +-
 storm-core/src/clj/backtype/storm/util.clj      |  14 +-
 storm-core/src/clj/backtype/storm/zookeeper.clj |  10 +-
 storm-core/src/jvm/backtype/storm/Config.java   |   5 +-
 .../jvm/backtype/storm/ConfigValidation.java    |  17 +
 .../src/jvm/backtype/storm/StormSubmitter.java  |  24 +-
 .../backtype/storm/messaging/netty/Client.java  | 195 ++++++----
 .../backtype/storm/messaging/netty/Context.java |  30 +-
 .../storm/messaging/netty/MessageBatch.java     |  55 +--
 .../backtype/storm/messaging/netty/Server.java  |   2 +-
 .../messaging/netty/StormClientHandler.java     |  80 ++--
 .../transactional/state/TransactionalState.java |   2 +-
 .../src/jvm/backtype/storm/utils/Time.java      |  18 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  71 ++--
 .../trident/spout/RichSpoutBatchTriggerer.java  |   1 +
 .../jvm/storm/trident/state/map/OpaqueMap.java  |   6 +-
 .../trident/state/map/RemovableMapState.java    |   8 +
 .../storm/trident/testing/MemoryMapState.java   |  27 +-
 .../topology/state/TransactionalState.java      |   2 +-
 .../test/clj/backtype/storm/config_test.clj     |  20 +
 .../test/clj/backtype/storm/supervisor_test.clj |  66 +++-
 .../test/clj/backtype/storm/utils_test.clj      |  13 +-
 .../test/clj/storm/trident/state_test.clj       |  33 +-
 storm-dist/binary/NOTICE                        |   9 +-
 storm-dist/binary/src/main/assembly/binary.xml  |  31 ++
 130 files changed, 9535 insertions(+), 353 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0cce4b82/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index d9b6f14,2471626..2587e58
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@@ -438,13 -438,12 +438,19 @@@
        (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
        ))
  
 +(defn jlp [stormroot conf]
 +  (let [resource-root (str stormroot "/" RESOURCES-SUBDIR)
 +        os (clojure.string/replace (System/getProperty "os.name") #"\s+" "_")
 +        arch (System/getProperty "os.arch")
 +        arch-resource-root (str resource-root "/" os "-" arch)]
 +    (str arch-resource-root ":" resource-root ":" (conf JAVA-LIBRARY-PATH)))) 
 +
+ (defn- substitute-worker-childopts [value port]
+   (let [sub-fn (fn [s] (.replaceAll s "%ID%" (str port)))]
+     (if (list? value)
+       (map sub-fn value)
+       (-> value sub-fn (.split " ")))))
+ 
  (defmethod launch-worker
      :distributed [supervisor storm-id port worker-id]
      (let [conf (:conf supervisor)
@@@ -454,23 -452,34 +460,34 @@@
            stormjar (supervisor-stormjar-path stormroot)
            storm-conf (read-supervisor-storm-conf conf storm-id)
            classpath (add-to-classpath (current-classpath) [stormjar])
-           childopts (.replaceAll (str (conf WORKER-CHILDOPTS) " " (storm-conf TOPOLOGY-WORKER-CHILDOPTS))
-                                  "%ID%"
-                                  (str port))
+           worker-childopts (when-let [s (conf WORKER-CHILDOPTS)]
+                              (substitute-worker-childopts s port))
+           topo-worker-childopts (when-let [s (storm-conf TOPOLOGY-WORKER-CHILDOPTS)]
+                                   (substitute-worker-childopts s port))
            logfilename (str "worker-" port ".log")
-           command (str "java -server " childopts
-                        " -Djava.library.path=" jlp
-                        " -Dlogfile.name=" logfilename
-                        " -Dstorm.home=" storm-home
-                        " -Dlogback.configurationFile=" storm-home "/logback/cluster.xml"
-                        " -Dstorm.id=" storm-id
-                        " -Dworker.id=" worker-id
-                        " -Dworker.port=" port
-                        " -cp " classpath " backtype.storm.daemon.worker "
-                        (java.net.URLEncoder/encode storm-id) " " (:assignment-id supervisor)
-                        " " port " " worker-id)]
-       (log-message "Launching worker with command: " command)
+           command (concat
+                     ["java" "-server"]
+                     worker-childopts
+                     topo-worker-childopts
 -                    [(str "-Djava.library.path=" (conf JAVA-LIBRARY-PATH))
++                    [(str "-Djava.library.path=" jlp)
+                      (str "-Dlogfile.name=" logfilename)
+                      (str "-Dstorm.home=" storm-home)
+                      (str "-Dlogback.configurationFile=" storm-home "/logback/cluster.xml")
+                      (str "-Dstorm.id=" storm-id)
+                      (str "-Dworker.id=" worker-id)
+                      (str "-Dworker.port=" port)
+                      "-cp" classpath
+                      "backtype.storm.daemon.worker"
+                      storm-id
+                      (:assignment-id supervisor)
+                      port
+                      worker-id])
+           command (->> command (map str) (filter (complement empty?)))
+           shell-cmd (->> command
+                          (map #(str \' (clojure.string/escape % {\' "\\'"}) \'))
+                          (clojure.string/join " "))]
+       (log-message "Launching worker with command: " shell-cmd)
 -      (launch-process command :environment {"LD_LIBRARY_PATH" (conf JAVA-LIBRARY-PATH)})
 +      (launch-process command :environment {"LD_LIBRARY_PATH" jlp})
        ))
  
  ;; local implementation

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/0cce4b82/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/backtype/storm/supervisor_test.clj
index 9f9def9,54a3ee9..d4f2bdc
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@@ -239,6 -241,66 +241,68 @@@
  
        )))
  
+ (deftest test-worker-launch-command
+   (testing "*.worker.childopts configuration"
+     (let [mock-port "42"
+           mock-storm-id "fake-storm-id"
+           mock-worker-id "fake-worker-id"
+           mock-cp "mock-classpath"
+           exp-args-fn (fn [opts topo-opts]
+                        (concat ["java" "-server"]
+                                opts
+                                topo-opts
+                                ["-Djava.library.path="
+                                 (str "-Dlogfile.name=worker-" mock-port ".log")
+                                 "-Dstorm.home="
+                                 "-Dlogback.configurationFile=/logback/cluster.xml"
+                                 (str "-Dstorm.id=" mock-storm-id)
+                                 (str "-Dworker.id=" mock-worker-id)
+                                 (str "-Dworker.port=" mock-port)
+                                 "-cp" mock-cp
+                                 "backtype.storm.daemon.worker"
+                                 mock-storm-id
+                                 mock-port
+                                 mock-worker-id]))]
+       (testing "testing *.worker.childopts as strings with extra spaces"
+         (let [string-opts "-Dfoo=bar  -Xmx1024m"
+               topo-string-opts "-Dkau=aux   -Xmx2048m"
+               exp-args (exp-args-fn ["-Dfoo=bar" "-Xmx1024m"]
+                                     ["-Dkau=aux" "-Xmx2048m"])
+               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+                                       WORKER-CHILDOPTS string-opts}}]
+           (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                                    topo-string-opts}
+                      add-to-classpath mock-cp
+                      supervisor-stormdist-root nil
++                     supervisor/jlp nil
+                      launch-process nil]
+             (supervisor/launch-worker mock-supervisor
+                                       mock-storm-id
+                                       mock-port
+                                       mock-worker-id)
+             (verify-first-call-args-for-indices launch-process
+                                                 [0]
+                                                 exp-args))))
+       (testing "testing *.worker.childopts as list of strings, with spaces in values"
+         (let [list-opts '("-Dopt1='this has a space in it'" "-Xmx1024m")
+               topo-list-opts '("-Dopt2='val with spaces'" "-Xmx2048m")
+               exp-args (exp-args-fn list-opts topo-list-opts)
+               mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed
+                                       WORKER-CHILDOPTS list-opts}}]
+           (stubbing [read-supervisor-storm-conf {TOPOLOGY-WORKER-CHILDOPTS
+                                                    topo-list-opts}
+                      add-to-classpath mock-cp
+                      supervisor-stormdist-root nil
++                     supervisor/jlp nil
+                      launch-process nil]
+             (supervisor/launch-worker mock-supervisor
+                                       mock-storm-id
+                                       mock-port
+                                       mock-worker-id)
+             (verify-first-call-args-for-indices launch-process
+                                                 [0]
+                                                 exp-args)))))))
+ 
  (deftest test-workers-go-bananas
    ;; test that multiple workers are started for a port, and test that
    ;; supervisor shuts down propertly (doesn't shutdown the most