You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/03/01 00:58:24 UTC
[01/13] storm git commit: [STORM-1245] port
backtype.storm.daemon.acker to java
Repository: storm
Updated Branches:
refs/heads/master 07629c1f8 -> c8138dc72
[STORM-1245] port backtype.storm.daemon.acker to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9dc271f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9dc271f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9dc271f
Branch: refs/heads/master
Commit: e9dc271f11e311eea2269a8f6035e2c322d8d520
Parents: 66d7a39
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 1 11:17:05 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 1 11:17:05 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/acker.clj | 107 ---------------
.../src/clj/org/apache/storm/daemon/common.clj | 12 +-
storm-core/src/clj/org/apache/storm/testing.clj | 11 +-
.../src/jvm/org/apache/storm/daemon/Acker.java | 133 +++++++++++++++++++
.../src/jvm/org/apache/storm/utils/Utils.java | 56 ++++++++
5 files changed, 203 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
deleted file mode 100644
index 7c4d614..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ /dev/null
@@ -1,107 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.acker
- (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
- (:import [org.apache.storm.tuple Tuple Fields])
- (:import [org.apache.storm.utils RotatingMap MutableObject])
- (:import [java.util List Map])
- (:import [org.apache.storm Constants])
- (:use [org.apache.storm config util log])
- (:gen-class
- :init init
- :implements [org.apache.storm.task.IBolt]
- :constructors {[] []}
- :state state ))
-
-(def ACKER-COMPONENT-ID "__acker")
-(def ACKER-INIT-STREAM-ID "__ack_init")
-(def ACKER-ACK-STREAM-ID "__ack_ack")
-(def ACKER-FAIL-STREAM-ID "__ack_fail")
-
-(defn- update-ack [curr-entry val]
- (let [old (get curr-entry :val 0)]
- (assoc curr-entry :val (bit-xor old val))
- ))
-
-(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values]
- (.emitDirect collector task stream values)
- )
-
-(defn mk-acker-bolt []
- (let [output-collector (MutableObject.)
- pending (MutableObject.)]
- (reify IBolt
- (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
- (.setObject output-collector collector)
- (.setObject pending (RotatingMap. 2))
- )
- (^void execute [this ^Tuple tuple]
- (let [^RotatingMap pending (.getObject pending)
- stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
- (let [id (.getValue tuple 0)
- ^OutputCollector output-collector (.getObject output-collector)
- curr (.get pending id)
- curr (condp = stream-id
- ACKER-INIT-STREAM-ID (-> curr
- (update-ack (.getValue tuple 1))
- (assoc :spout-task (.getValue tuple 2)))
- ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
- ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
- (.put pending id curr)
- (when (and curr (:spout-task curr))
- (cond (= 0 (:val curr))
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-ACK-STREAM-ID
- [id]
- ))
- (:failed curr)
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-FAIL-STREAM-ID
- [id]
- ))
- ))
- (.ack output-collector tuple)
- ))))
- (^void cleanup [this]
- )
- )))
-
-(defn -init []
- [[] (container)])
-
-(defn -prepare [this conf context collector]
- (let [^IBolt ret (mk-acker-bolt)]
- (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
- (.prepare ret conf context collector)
- ))
-
-(defn -execute [this tuple]
- (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
- (.execute delegate tuple)
- ))
-
-(defn -cleanup [this]
- (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
- (.cleanup delegate)
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6c184fd..45e0582 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,6 +15,7 @@
;; limitations under the License.
(ns org.apache.storm.daemon.common
(:use [org.apache.storm log config util])
+ (:import [org.apache.storm.daemon Acker])
(:import [org.apache.storm.generated StormTopology
InvalidTopologyException GlobalStreamId]
[org.apache.storm.utils ThriftTopologyUtils])
@@ -26,17 +27,16 @@
(:import [org.apache.storm.security.auth IAuthorizer])
(:import [java.io InterruptedIOException])
(:require [clojure.set :as set])
- (:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
(:require [metrics.reporters.jmx :as jmx]))
(defn start-metrics-reporters []
(jmx/start (jmx/reporter {})))
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
(def SYSTEM-STREAM-ID "__system")
@@ -207,7 +207,7 @@
(defn add-acker! [storm-conf ^StormTopology ret]
(let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
- (new org.apache.storm.daemon.acker)
+ (Acker. )
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index cc78659..08662ff 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -45,9 +45,9 @@
(:import [org.apache.storm.tuple Tuple])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.task TopologyContext])
+ (:import [org.apache.storm.daemon Acker])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.messaging.loader :as msg-loader])
- (:require [org.apache.storm.daemon.acker :as acker])
(:use [org.apache.storm cluster util thrift config log local-state]))
(defn feeder-spout
@@ -612,6 +612,11 @@
(get key)
.get))
+;; Temporary solution. It should be removed after migration.
+(defn mk-acker-bolt
+ []
+ (Acker.))
+
(defmacro with-tracked-cluster
[[cluster-sym & cluster-args] & body]
`(let [id# (uuid)]
@@ -622,8 +627,8 @@
(.put "transferred" (AtomicInteger. 0))
(.put "processed" (AtomicInteger. 0))))
(with-var-roots
- [acker/mk-acker-bolt
- (let [old# acker/mk-acker-bolt]
+ [mk-acker-bolt
+ (let [old# mk-acker-bolt]
(fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
;; critical that this particular function is overridden here,
;; since the transferred stat needs to be incremented at the moment
http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..1e38fd7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class Acker implements IBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+ private static final long serialVersionUID = 4430906880683183091L;
+
+ public static final String ACKER_COMPONENT_ID = "__acker";
+ public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+ public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+ public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+ public static final int TIMEOUT_BUCKET_NUM = 3;
+
+ private OutputCollector collector;
+ private RotatingMap<Object, AckObject> pending;
+
+ private class AckObject {
+ public long val = 0L;
+ public Integer spoutTask = null;
+ public boolean failed = false;
+
+ // val xor value
+ public void updateAck(Object value) {
+ val = Utils.bitXor(val, value);
+ }
+ }
+
+ public Acker() {
+
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (TupleUtils.isTick(input)) {
+ Map<Object, AckObject> tmp = pending.rotate();
+ LOG.debug("Number of timeout tuples:{}", tmp.size());
+ }
+
+ String streamId = input.getSourceStreamId();
+ Object id = input.getValue(0);
+ AckObject curr = pending.get(id);
+ if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ curr.spoutTask = input.getInteger(2);
+ pending.put(id, curr);
+ } else {
+ // If receiving bolt's ack before the init message from spout, just update the xor value.
+ curr.updateAck(input.getValue(1));
+ curr.spoutTask = input.getInteger(2);
+ }
+ } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+ if (curr != null) {
+ curr.updateAck(input.getValue(1));
+ } else {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ pending.put(id, curr);
+ }
+ } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ // The tuple has been already timeout or failed. So, do nothing
+ return;
+ }
+ curr.failed = true;
+ } else {
+ LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+ return;
+ }
+
+ Integer task = curr.spoutTask;
+ if (task != null) {
+ if (curr.val == 0) {
+ pending.remove(id);
+ List values = Utils.mkList(id);
+ collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+ } else {
+ if (curr.failed) {
+ pending.remove(id);
+ List values = Utils.mkList(id);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+ }
+ }
+ } else {
+
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 380f4dd..e3813f8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1374,5 +1374,61 @@ public class Utils {
return new RuntimeException(e);
}
}
+
+ public static <T> long bitXorValsSets(java.util.Set<T> vals) {
+ long rtn = 0l;
+ for (T n : vals) {
+ rtn = bitXor(rtn, n);
+ }
+ return rtn;
+ }
+
+ public static long bitXor(Object a, Object b) {
+ long rtn;
+
+ if (a instanceof Long && b instanceof Long) {
+ rtn = ((Long) a) ^ ((Long) b);
+ return rtn;
+ } else if (b instanceof Set) {
+ long bs = bitXorValsSets((Set) b);
+ return bitXor(a, bs);
+ } else if (a instanceof Set) {
+ long as = bitXorValsSets((Set) a);
+ return bitXor(as, b);
+ } else {
+ long ai = Long.parseLong(String.valueOf(a));
+ long bi = Long.parseLong(String.valueOf(b));
+ rtn = ai ^ bi;
+ return rtn;
+ }
+ }
+
+ public static <V> List<V> mkList(V... args) {
+ ArrayList<V> rtn = new ArrayList<V>();
+ for (V o : args) {
+ rtn.add(o);
+ }
+ return rtn;
+ }
+
+ public static <V> List<V> mkList(java.util.Set<V> args) {
+ ArrayList<V> rtn = new ArrayList<V>();
+ if (args != null) {
+ for (V o : args) {
+ rtn.add(o);
+ }
+ }
+ return rtn;
+ }
+
+ public static <V> List<V> mkList(Collection<V> args) {
+ ArrayList<V> rtn = new ArrayList<V>();
+ if (args != null) {
+ for (V o : args) {
+ rtn.add(o);
+ }
+ }
+ return rtn;
+ }
}
[06/13] storm git commit: Update according to review comments
Posted by lo...@apache.org.
Update according to review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7ce9a3c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7ce9a3c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7ce9a3c9
Branch: refs/heads/master
Commit: 7ce9a3c9ebf460143ad75275e4108b0b32f0b206
Parents: a7d0289
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 13:03:40 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 13:03:40 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/acker.clj | 8 +++---
.../jvm/org/apache/storm/daemon/AckerBolt.java | 13 +++++----
.../src/jvm/org/apache/storm/utils/Utils.java | 28 --------------------
3 files changed, 10 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 39e6f55..9bd4f44 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -22,10 +22,10 @@
(org.apache.storm.daemon AckerBolt))
(:use [org.apache.storm config util])
(:gen-class
- :init init
- :implements [org.apache.storm.task.IBolt]
- :constructors {[] []}
- :state state))
+ :init init
+ :implements [org.apache.storm.task.IBolt]
+ :constructors {[] []}
+ :state state))
(def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
(def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)
http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index 763b9a0..7c1514f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -21,6 +21,7 @@ import org.apache.storm.task.IBolt;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
import org.apache.storm.utils.RotatingMap;
import org.apache.storm.utils.TupleUtils;
import org.apache.storm.utils.Utils;
@@ -81,12 +82,12 @@ public class AckerBolt implements IBolt {
pending.put(id, curr);
} else {
// If receiving bolt's ack before the init message from spout, just update the xor value.
- curr.updateAck(input.getValue(1));
+ curr.updateAck(input.getLong(1));
curr.spoutTask = input.getInteger(2);
}
} else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
if (curr != null) {
- curr.updateAck(input.getValue(1));
+ curr.updateAck(input.getLong(1));
} else {
curr = new AckObject();
curr.val = input.getLong(1);
@@ -107,13 +108,11 @@ public class AckerBolt implements IBolt {
if (task != null) {
if (curr.val == 0) {
pending.remove(id);
- List values = Utils.makeList(id);
- collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+ collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
} else {
if (curr.failed) {
pending.remove(id);
- List values = Utils.makeList(id);
- collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
}
}
}
@@ -123,6 +122,6 @@ public class AckerBolt implements IBolt {
@Override
public void cleanup() {
-
+ LOG.info("Acker: cleanup successfully");
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 9ca2ece..2361987 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1378,33 +1378,5 @@ public class Utils {
public static long bitXor(Object a, Object b) {
return ((Long) a) ^ ((Long) b);
}
-
- public static <V> List<V> makeList(V... args) {
- ArrayList<V> rtn = new ArrayList<V>();
- for (V o : args) {
- rtn.add(o);
- }
- return rtn;
- }
-
- public static <V> List<V> makeList(java.util.Set<V> args) {
- ArrayList<V> rtn = new ArrayList<V>();
- if (args != null) {
- for (V o : args) {
- rtn.add(o);
- }
- }
- return rtn;
- }
-
- public static <V> List<V> makeList(Collection<V> args) {
- ArrayList<V> rtn = new ArrayList<V>();
- if (args != null) {
- for (V o : args) {
- rtn.add(o);
- }
- }
- return rtn;
- }
}
[02/13] storm git commit: [Storm 1245] port
backtype.storm.daemon.acker to java
Posted by lo...@apache.org.
[Storm 1245] port backtype.storm.daemon.acker to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/675b0c4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/675b0c4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/675b0c4f
Branch: refs/heads/master
Commit: 675b0c4f786838a13122b6743ca6c946aa1d63ee
Parents: e9dc271
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 15:46:08 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 15:46:08 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/acker.clj | 57 ++++++++
.../src/clj/org/apache/storm/daemon/common.clj | 16 +--
storm-core/src/clj/org/apache/storm/testing.clj | 11 +-
.../src/jvm/org/apache/storm/daemon/Acker.java | 133 -------------------
.../jvm/org/apache/storm/daemon/AckerBolt.java | 129 ++++++++++++++++++
5 files changed, 197 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
new file mode 100644
index 0000000..9902b35
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -0,0 +1,57 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements. See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership. The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License. You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.acker
+ (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+ (:import [org.apache.storm.tuple Tuple Fields])
+ (:import [org.apache.storm.utils RotatingMap MutableObject])
+ (:import [java.util List Map])
+ (:import [org.apache.storm Constants]
+ (org.apache.storm.daemon AckerBolt))
+ (:use [org.apache.storm config util log])
+ (:gen-class
+ :init init
+ :implements [org.apache.storm.task.IBolt]
+ :constructors {[] []}
+ :state state))
+
+(def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID AckerBolt/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID AckerBolt/ACKER_FAIL_STREAM_ID)
+
+(defn mk-acker-bolt []
+ (let [output-collector (MutableObject.)
+ pending (MutableObject.)]
+ (log-message "Symbol AckerBolt" (symbol "AckerBolt") )
+ (AckerBolt.)))
+
+(defn -init []
+ [[] (container)])
+
+(defn -prepare [this conf context collector]
+ (let [^IBolt ret (mk-acker-bolt)]
+ (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+ (.prepare ret conf context collector)))
+
+(defn -execute [this tuple]
+ (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (.execute delegate tuple)
+ ))
+
+(defn -cleanup [this]
+ (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (.cleanup delegate)
+ ))
http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 45e0582..6ecc918 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,7 +15,6 @@
;; limitations under the License.
(ns org.apache.storm.daemon.common
(:use [org.apache.storm log config util])
- (:import [org.apache.storm.daemon Acker])
(:import [org.apache.storm.generated StormTopology
InvalidTopologyException GlobalStreamId]
[org.apache.storm.utils ThriftTopologyUtils])
@@ -24,19 +23,20 @@
(:import [org.apache.storm Constants])
(:import [org.apache.storm.metric SystemBolt])
(:import [org.apache.storm.metric EventLoggerBolt])
- (:import [org.apache.storm.security.auth IAuthorizer])
+ (:import [org.apache.storm.security.auth IAuthorizer])
(:import [java.io InterruptedIOException])
- (:require [clojure.set :as set])
+ (:require [clojure.set :as set])
+ (:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
(:require [metrics.reporters.jmx :as jmx]))
(defn start-metrics-reporters []
(jmx/start (jmx/reporter {})))
-(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
+(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
+(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
+(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
+(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
(def SYSTEM-STREAM-ID "__system")
@@ -207,7 +207,7 @@
(defn add-acker! [storm-conf ^StormTopology ret]
(let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
- (Acker. )
+ (new org.apache.storm.daemon.acker)
{ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
}
http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 08662ff..cc78659 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -45,9 +45,9 @@
(:import [org.apache.storm.tuple Tuple])
(:import [org.apache.storm.generated StormTopology])
(:import [org.apache.storm.task TopologyContext])
- (:import [org.apache.storm.daemon Acker])
(:require [org.apache.storm [zookeeper :as zk]])
(:require [org.apache.storm.messaging.loader :as msg-loader])
+ (:require [org.apache.storm.daemon.acker :as acker])
(:use [org.apache.storm cluster util thrift config log local-state]))
(defn feeder-spout
@@ -612,11 +612,6 @@
(get key)
.get))
-;; Temporary solution. It should be removed after migration.
-(defn mk-acker-bolt
- []
- (Acker.))
-
(defmacro with-tracked-cluster
[[cluster-sym & cluster-args] & body]
`(let [id# (uuid)]
@@ -627,8 +622,8 @@
(.put "transferred" (AtomicInteger. 0))
(.put "processed" (AtomicInteger. 0))))
(with-var-roots
- [mk-acker-bolt
- (let [old# mk-acker-bolt]
+ [acker/mk-acker-bolt
+ (let [old# acker/mk-acker-bolt]
(fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
;; critical that this particular function is overridden here,
;; since the transferred stat needs to be incremented at the moment
http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
deleted file mode 100644
index 1e38fd7..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class Acker implements IBolt {
- private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
-
- private static final long serialVersionUID = 4430906880683183091L;
-
- public static final String ACKER_COMPONENT_ID = "__acker";
- public static final String ACKER_INIT_STREAM_ID = "__ack_init";
- public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
- public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
-
- public static final int TIMEOUT_BUCKET_NUM = 3;
-
- private OutputCollector collector;
- private RotatingMap<Object, AckObject> pending;
-
- private class AckObject {
- public long val = 0L;
- public Integer spoutTask = null;
- public boolean failed = false;
-
- // val xor value
- public void updateAck(Object value) {
- val = Utils.bitXor(val, value);
- }
- }
-
- public Acker() {
-
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
- }
-
- @Override
- public void execute(Tuple input) {
- if (TupleUtils.isTick(input)) {
- Map<Object, AckObject> tmp = pending.rotate();
- LOG.debug("Number of timeout tuples:{}", tmp.size());
- }
-
- String streamId = input.getSourceStreamId();
- Object id = input.getValue(0);
- AckObject curr = pending.get(id);
- if (ACKER_INIT_STREAM_ID.equals(streamId)) {
- if (curr == null) {
- curr = new AckObject();
- curr.val = input.getLong(1);
- curr.spoutTask = input.getInteger(2);
- pending.put(id, curr);
- } else {
- // If receiving bolt's ack before the init message from spout, just update the xor value.
- curr.updateAck(input.getValue(1));
- curr.spoutTask = input.getInteger(2);
- }
- } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
- if (curr != null) {
- curr.updateAck(input.getValue(1));
- } else {
- curr = new AckObject();
- curr.val = input.getLong(1);
- pending.put(id, curr);
- }
- } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
- if (curr == null) {
- // The tuple has been already timeout or failed. So, do nothing
- return;
- }
- curr.failed = true;
- } else {
- LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
- return;
- }
-
- Integer task = curr.spoutTask;
- if (task != null) {
- if (curr.val == 0) {
- pending.remove(id);
- List values = Utils.mkList(id);
- collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
- } else {
- if (curr.failed) {
- pending.remove(id);
- List values = Utils.mkList(id);
- collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
- }
- }
- } else {
-
- }
-
- collector.ack(input);
- }
-
- @Override
- public void cleanup() {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
new file mode 100644
index 0000000..80ed4ca
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class AckerBolt implements IBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(AckerBolt.class);
+
+ private static final long serialVersionUID = 4430906880683183091L;
+
+ public static final String ACKER_COMPONENT_ID = "__acker";
+ public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+ public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+ public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+ public static final int TIMEOUT_BUCKET_NUM = 3;
+
+ private OutputCollector collector;
+ private RotatingMap<Object, AckObject> pending;
+
+ private class AckObject {
+ public long val = 0L;
+ public Integer spoutTask = null;
+ public boolean failed = false;
+
+ // val xor value
+ public void updateAck(Object value) {
+ val = Utils.bitXor(val, value);
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (TupleUtils.isTick(input)) {
+ Map<Object, AckObject> tmp = pending.rotate();
+ LOG.debug("Number of timeout tuples:{}", tmp.size());
+ }
+
+ String streamId = input.getSourceStreamId();
+ Object id = input.getValue(0);
+ AckObject curr = pending.get(id);
+ if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ curr.spoutTask = input.getInteger(2);
+ pending.put(id, curr);
+ } else {
+ // If receiving bolt's ack before the init message from spout, just update the xor value.
+ curr.updateAck(input.getValue(1));
+ curr.spoutTask = input.getInteger(2);
+ }
+ } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+ if (curr != null) {
+ curr.updateAck(input.getValue(1));
+ } else {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ pending.put(id, curr);
+ }
+ } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ // The tuple has been already timeout or failed. So, do nothing
+ return;
+ }
+ curr.failed = true;
+ } else {
+ LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+ return;
+ }
+
+ Integer task = curr.spoutTask;
+ if (task != null) {
+ if (curr.val == 0) {
+ pending.remove(id);
+ List values = Utils.mkList(id);
+ collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+ } else {
+ if (curr.failed) {
+ pending.remove(id);
+ List values = Utils.mkList(id);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+ }
+ }
+ } else {
+
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+
+ }
+}
\ No newline at end of file
[05/13] storm git commit: update according to review comments
Posted by lo...@apache.org.
update according to review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7d0289c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7d0289c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7d0289c
Branch: refs/heads/master
Commit: a7d0289ca52f2a9f2d84ce1bf9ebc84d3c7dcd87
Parents: c4dfa33
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Feb 4 19:47:35 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Thu Feb 4 19:47:35 2016 +0800
----------------------------------------------------------------------
.../jvm/org/apache/storm/daemon/AckerBolt.java | 4 +--
.../src/jvm/org/apache/storm/utils/Utils.java | 32 +++-----------------
2 files changed, 6 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a7d0289c/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index a4f6815..763b9a0 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -107,12 +107,12 @@ public class AckerBolt implements IBolt {
if (task != null) {
if (curr.val == 0) {
pending.remove(id);
- List values = Utils.mkList(id);
+ List values = Utils.makeList(id);
collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
} else {
if (curr.failed) {
pending.remove(id);
- List values = Utils.mkList(id);
+ List values = Utils.makeList(id);
collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a7d0289c/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index e3813f8..9ca2ece 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1375,35 +1375,11 @@ public class Utils {
}
}
- public static <T> long bitXorValsSets(java.util.Set<T> vals) {
- long rtn = 0l;
- for (T n : vals) {
- rtn = bitXor(rtn, n);
- }
- return rtn;
- }
-
public static long bitXor(Object a, Object b) {
- long rtn;
-
- if (a instanceof Long && b instanceof Long) {
- rtn = ((Long) a) ^ ((Long) b);
- return rtn;
- } else if (b instanceof Set) {
- long bs = bitXorValsSets((Set) b);
- return bitXor(a, bs);
- } else if (a instanceof Set) {
- long as = bitXorValsSets((Set) a);
- return bitXor(as, b);
- } else {
- long ai = Long.parseLong(String.valueOf(a));
- long bi = Long.parseLong(String.valueOf(b));
- rtn = ai ^ bi;
- return rtn;
- }
+ return ((Long) a) ^ ((Long) b);
}
- public static <V> List<V> mkList(V... args) {
+ public static <V> List<V> makeList(V... args) {
ArrayList<V> rtn = new ArrayList<V>();
for (V o : args) {
rtn.add(o);
@@ -1411,7 +1387,7 @@ public class Utils {
return rtn;
}
- public static <V> List<V> mkList(java.util.Set<V> args) {
+ public static <V> List<V> makeList(java.util.Set<V> args) {
ArrayList<V> rtn = new ArrayList<V>();
if (args != null) {
for (V o : args) {
@@ -1421,7 +1397,7 @@ public class Utils {
return rtn;
}
- public static <V> List<V> mkList(Collection<V> args) {
+ public static <V> List<V> makeList(Collection<V> args) {
ArrayList<V> rtn = new ArrayList<V>();
if (args != null) {
for (V o : args) {
[13/13] storm git commit: Send failure response to spout instead of
doing nothing, for the case that acker receives FAIL before INIT
Posted by lo...@apache.org.
Send failure response to spout instead of doing nothing, for the case that acker receives FAIL before INIT
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8138dc7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8138dc7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8138dc7
Branch: refs/heads/master
Commit: c8138dc72d4f39fe1ebb09c42931d6cbd3198c7e
Parents: 6e433c8
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 28 21:50:09 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 28 21:50:09 2016 +0800
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 7 ++++---
1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c8138dc7/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index 98f73df..7d05e24 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -94,18 +94,19 @@ public class Acker implements IBolt {
pending.put(id, curr);
}
} else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+ // For the case that ack_fail message arrives before ack_init
if (curr == null) {
- // The tuple has been already timeout or failed. So, do nothing
- return;
+ curr = new AckObject();
}
curr.failed = true;
+ pending.put(id, curr);
} else {
LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
return;
}
Integer task = curr.spoutTask;
- if (task != null) {
+ if (curr != null && task != null) {
if (curr.val == 0) {
pending.remove(id);
collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
[07/13] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c0f4f01
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c0f4f01
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c0f4f01
Branch: refs/heads/master
Commit: 6c0f4f01aaef6c7a728db45c6d1b3726f2da1ab9
Parents: 7ce9a3c 12ceb09
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 13:47:48 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 13:47:48 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 17 +
README.markdown | 2 +
bin/storm-config.cmd | 6 +-
bin/storm.cmd | 47 +-
bin/storm.py | 8 +-
conf/defaults.yaml | 4 +
dev-tools/travis/travis-script.sh | 4 +-
.../starter/trident/TridentMapExample.java | 123 +++
external/sql/storm-sql-core/pom.xml | 9 +
external/storm-elasticsearch/pom.xml | 2 +
.../storm/hbase/security/HBaseSecurityUtil.java | 36 +-
external/storm-hdfs/README.md | 15 +-
external/storm-hdfs/pom.xml | 22 +
.../storm/hdfs/avro/AbstractAvroSerializer.java | 80 ++
.../storm/hdfs/avro/AvroSchemaRegistry.java | 28 +
.../org/apache/storm/hdfs/avro/AvroUtils.java | 44 +
.../hdfs/avro/ConfluentAvroSerializer.java | 83 ++
.../storm/hdfs/avro/FixedAvroSerializer.java | 67 ++
.../storm/hdfs/avro/GenericAvroSerializer.java | 36 +
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 4 -
.../hdfs/avro/TestFixedAvroSerializer.java | 76 ++
.../hdfs/avro/TestGenericAvroSerializer.java | 68 ++
.../test/resources/FixedAvroSerializer.config | 2 +
.../jvm/org/apache/storm/kafka/KafkaSpout.java | 8 +-
.../jvm/org/apache/storm/kafka/KafkaUtils.java | 44 +-
.../apache/storm/kafka/PartitionManager.java | 42 +-
.../kafka/trident/TridentKafkaEmitter.java | 23 +-
external/storm-mqtt/core/pom.xml | 4 +-
log4j2/cluster.xml | 2 +-
log4j2/worker.xml | 2 +-
pom.xml | 12 +-
storm-core/pom.xml | 11 +-
.../src/clj/org/apache/storm/LocalCluster.clj | 4 +-
storm-core/src/clj/org/apache/storm/clojure.clj | 8 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 27 +-
.../cluster_state/zookeeper_state_factory.clj | 14 +-
.../clj/org/apache/storm/command/blobstore.clj | 11 +-
.../org/apache/storm/command/config_value.clj | 25 -
.../org/apache/storm/command/dev_zookeeper.clj | 6 +-
.../clj/org/apache/storm/command/get_errors.clj | 12 +-
.../apache/storm/command/shell_submission.clj | 4 +-
storm-core/src/clj/org/apache/storm/config.clj | 18 +-
.../src/clj/org/apache/storm/converter.clj | 14 +-
.../src/clj/org/apache/storm/daemon/acker.clj | 21 +-
.../src/clj/org/apache/storm/daemon/common.clj | 40 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 25 +-
.../clj/org/apache/storm/daemon/executor.clj | 532 +++++-----
.../clj/org/apache/storm/daemon/logviewer.clj | 70 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 172 ++--
.../clj/org/apache/storm/daemon/supervisor.clj | 202 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 4 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 78 +-
.../src/clj/org/apache/storm/disruptor.clj | 10 +-
storm-core/src/clj/org/apache/storm/event.clj | 2 +-
.../src/clj/org/apache/storm/local_state.clj | 9 +-
.../clj/org/apache/storm/messaging/loader.clj | 34 -
.../clj/org/apache/storm/messaging/local.clj | 23 -
.../org/apache/storm/pacemaker/pacemaker.clj | 7 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 24 +-
.../clj/org/apache/storm/process_simulator.clj | 4 +-
.../apache/storm/scheduler/DefaultScheduler.clj | 7 +-
.../apache/storm/scheduler/EvenScheduler.clj | 23 +-
.../storm/scheduler/IsolationScheduler.clj | 29 +-
storm-core/src/clj/org/apache/storm/stats.clj | 82 +-
storm-core/src/clj/org/apache/storm/testing.clj | 89 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 6 +-
storm-core/src/clj/org/apache/storm/timer.clj | 12 +-
.../clj/org/apache/storm/trident/testing.clj | 9 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 99 +-
.../src/clj/org/apache/storm/ui/helpers.clj | 14 +-
storm-core/src/clj/org/apache/storm/util.clj | 923 +-----------------
.../src/clj/org/apache/storm/zookeeper.clj | 1 -
storm-core/src/jvm/org/apache/storm/Config.java | 39 +
.../org/apache/storm/command/ConfigValue.java | 30 +
.../storm/daemon/metrics/MetricsUtils.java | 108 ++
.../reporters/ConsolePreparableReporter.java | 76 ++
.../reporters/CsvPreparableReporter.java | 80 ++
.../reporters/JmxPreparableReporter.java | 70 ++
.../metrics/reporters/PreparableReporter.java | 32 +
.../storm/logging/ThriftAccessLogger.java | 13 +-
.../serialization/SerializationFactory.java | 17 +-
.../staticmocking/MockedConfigUtils.java | 31 -
.../jvm/org/apache/storm/trident/Stream.java | 87 +-
.../storm/trident/operation/Consumer.java | 35 +
.../trident/operation/FlatMapFunction.java | 37 +
.../storm/trident/operation/MapFunction.java | 36 +
.../operation/impl/ConsumerExecutor.java | 38 +
.../operation/impl/FlatMapFunctionExecutor.java | 43 +
.../operation/impl/MapFunctionExecutor.java | 41 +
.../trident/planner/processor/MapProcessor.java | 87 ++
.../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +-
.../jvm/org/apache/storm/utils/Container.java | 11 +-
.../jvm/org/apache/storm/utils/IPredicate.java | 27 +
.../org/apache/storm/utils/NimbusClient.java | 2 +-
.../utils/StormConnectionStateConverter.java | 44 +
.../jvm/org/apache/storm/utils/TestUtils.java | 34 -
.../src/jvm/org/apache/storm/utils/Time.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 973 ++++++++++++++++++-
.../storm/validation/ConfigValidation.java | 2 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 7 +
.../org/apache/storm/integration_test.clj | 100 +-
.../org/apache/storm/testing4j_test.clj | 37 +-
.../apache/storm/trident/integration_test.clj | 15 +-
.../test/clj/org/apache/storm/cluster_test.clj | 20 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../clj/org/apache/storm/logviewer_test.clj | 267 ++---
.../storm/messaging/netty_integration_test.clj | 2 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 131 ++-
.../scheduler/resource_aware_scheduler_test.clj | 21 +-
.../apache/storm/security/auth/auth_test.clj | 11 +-
.../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +-
.../BlowfishTupleSerializer_test.clj | 1 -
.../clj/org/apache/storm/serialization_test.clj | 23 +-
.../clj/org/apache/storm/supervisor_test.clj | 645 ++++++------
.../clj/org/apache/storm/transactional_test.clj | 18 +
.../clj/org/apache/storm/trident/state_test.clj | 5 +-
.../clj/org/apache/storm/trident/tuple_test.clj | 15 +-
.../test/clj/org/apache/storm/utils_test.clj | 16 +-
.../test/clj/org/apache/storm/worker_test.clj | 1 -
.../staticmocking/ConfigUtilsInstaller.java | 38 +
.../utils/staticmocking/UtilsInstaller.java | 38 +
.../storm/utils/staticmocking/package-info.java | 95 ++
122 files changed, 4723 insertions(+), 2472 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 9bd4f44,dc05dfc..7e17d40
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@@ -14,13 -14,13 +14,14 @@@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.acker
- (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+ (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
+ [org.apache.storm.utils Utils])
(:import [org.apache.storm.tuple Tuple Fields])
- (:import [org.apache.storm.utils RotatingMap MutableObject])
+ (:import [org.apache.storm.utils Container RotatingMap MutableObject])
(:import [java.util List Map])
- (:import [org.apache.storm Constants])
+ (:import [org.apache.storm Constants]
+ (org.apache.storm.daemon AckerBolt))
- (:use [org.apache.storm config util])
+ (:use [org.apache.storm config log])
(:gen-class
:init init
:implements [org.apache.storm.task.IBolt]
@@@ -35,18 -44,61 +36,18 @@@
(defn mk-acker-bolt []
(let [output-collector (MutableObject.)
pending (MutableObject.)]
- (reify IBolt
- (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
- (.setObject output-collector collector)
- (.setObject pending (RotatingMap. 2))
- )
- (^void execute [this ^Tuple tuple]
- (let [^RotatingMap pending (.getObject pending)
- stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
- (let [id (.getValue tuple 0)
- ^OutputCollector output-collector (.getObject output-collector)
- curr (.get pending id)
- curr (condp = stream-id
- ACKER-INIT-STREAM-ID (-> curr
- (update-ack (.getValue tuple 1))
- (assoc :spout-task (.getValue tuple 2)))
- ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
- ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
- (.put pending id curr)
- (when (and curr (:spout-task curr))
- (cond (= 0 (:val curr))
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-ACK-STREAM-ID
- [id]
- ))
- (:failed curr)
- (do
- (.remove pending id)
- (acker-emit-direct output-collector
- (:spout-task curr)
- ACKER-FAIL-STREAM-ID
- [id]
- ))
- ))
- (.ack output-collector tuple)
- ))))
- (^void cleanup [this]
- )
- )))
+ (AckerBolt.)))
(defn -init []
- [[] (container)])
+ [[] (Container.)])
- (defn -prepare [this conf context collector]
+ (defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
(let [^IBolt ret (mk-acker-bolt)]
- (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+ (.. this state (set ret))
- (.prepare ret conf context collector)
- ))
+ (.prepare ret conf context collector)))
- (defn -execute [this tuple]
- (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (defn -execute [^org.apache.storm.daemon.acker this tuple]
+ (let [^IBolt delegate (.. this state (get))]
(.execute delegate tuple)
))
http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6ecc918,eb1ec1e..42fa1fa
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@@ -24,14 -26,23 +26,23 @@@
(:import [org.apache.storm.metric SystemBolt])
(:import [org.apache.storm.metric EventLoggerBolt])
(:import [org.apache.storm.security.auth IAuthorizer])
- (:import [java.io InterruptedIOException])
+ (:import [java.io InterruptedIOException]
+ [org.json.simple JSONValue])
- (:require [clojure.set :as set])
+ (:require [clojure.set :as set])
(:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
- (:require [metrics.reporters.jmx :as jmx]))
+ (:require [metrics.core :refer [default-registry]]))
+
+ (defn start-metrics-reporter [reporter conf]
+ (doto reporter
+ (.prepare default-registry conf)
+ (.start))
+ (log-message "Started statistics report plugin..."))
+
+ (defn start-metrics-reporters [conf]
+ (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
+ (start-metrics-reporter reporter conf)))
- (defn start-metrics-reporters []
- (jmx/start (jmx/reporter {})))
(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
@@@ -134,9 -144,10 +144,10 @@@
(defn component-conf [component]
(->> component
- .get_common
- .get_json_conf
+ .get_common
+ .get_json_conf
- from-json))
+ (#(if % (JSONValue/parse %)))
+ clojurify-structure))
(defn validate-basic! [^StormTopology topology]
(validate-ids! topology)
http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 2361987,a0c0b1a..44fb1a1
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@@ -441,7 -509,11 +509,11 @@@ public class Utils
HashMap nconf = new HashMap(conf);
// only enable cleanup of blobstore on nimbus
nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
+
+ if(store != null) {
+ // store can be null during testing when mocking utils.
- store.prepare(nconf, baseDir, nimbusInfo);
+ store.prepare(nconf, baseDir, nimbusInfo);
+ }
return store;
}
@@@ -878,34 -945,18 +945,18 @@@
} else {
inputStream = new BufferedInputStream(new FileInputStream(inFile));
}
- tis = new TarArchiveInputStream(inputStream);
+ try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
- for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
- unpackEntries(tis, entry, untarDir);
- entry = tis.getNextTarEntry();
- }
+ for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+ unpackEntries(tis, entry, untarDir);
+ entry = tis.getNextTarEntry();
+ }
+ }
} finally {
- cleanup(tis, inputStream);
+ if(inputStream != null) {
+ inputStream.close();
- }
}
}
-
- /**
- * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
- * null pointers. Must only be used for cleanup in exception handlers.
- *
- * @param closeables the objects to close
- */
- private static void cleanup(java.io.Closeable... closeables) {
- for (java.io.Closeable c : closeables) {
- if (c != null) {
- try {
- c.close();
- } catch (IOException e) {
- LOG.debug("Exception in closing " + c, e);
-
+ }
- }
- }
- }
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
@@@ -1186,7 -1258,12 +1258,12 @@@
return dump.toString();
}
- // Assumes caller is synchronizing
- /**
++ /*
+ * Creates an instance of the pluggable SerializationDelegate or falls back to
+ * DefaultSerializationDelegate if something goes wrong.
+ * @param stormConf The config from which to pull the name of the pluggable class.
+ * @return an instance of the class specified by storm.meta.serialization.delegate
+ */
private static SerializationDelegate getSerializationDelegate(Map stormConf) {
String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
SerializationDelegate delegate;
@@@ -1375,8 -1446,806 +1446,810 @@@
}
}
+ /**
+ * Determines if a zip archive contains a particular directory.
+ *
+ * @param zipfile path to the zipped file
+ * @param target directory being looked for in the zip.
+ * @return boolean whether or not the directory exists in the zip.
+ */
+ public static boolean zipDoesContainDir(String zipfile, String target) throws IOException {
+ List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new ZipFile(zipfile).entries());
+
+ String targetDir = target + "/";
+ for(ZipEntry entry : entries) {
+ String name = entry.getName();
+ if(name.startsWith(targetDir)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * Joins any number of maps together into a single map, combining their values into
+ * a list, maintaining values in the order the maps were passed in. Nulls are inserted
+ * for given keys when the map does not contain that key.
+ *
+ * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) ->
+ * {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]}
+ *
+ * @param maps variable number of maps to join - order affects order of values in output.
+ * @return combined map
+ */
+ public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+ Map<K, List<V>> ret = new HashMap<>();
+
+ Set<K> keys = new HashSet<>();
+
+ for(Map<K, V> map : maps) {
+ keys.addAll(map.keySet());
+ }
+
+ for(Map<K, V> m : maps) {
+ for(K key : keys) {
+ V value = m.get(key);
+
+ if(!ret.containsKey(key)) {
+ ret.put(key, new ArrayList<V>());
+ }
+
+ List<V> targetList = ret.get(key);
+ targetList.add(value);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Fills up chunks out of a collection (given a maximum amount of chunks)
+ *
+ * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+ * partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+ * partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+ * @param maxNumChunks the maximum number of chunks to return
+ * @param coll the collection to be chunked up
+ * @return a list of the chunks, which are themselves lists.
+ */
+ public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) {
+ List<List<T>> ret = new ArrayList<>();
+
+ if(maxNumChunks == 0 || coll == null) {
+ return ret;
+ }
+
+ Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks);
+
+ // Keys sorted in descending order
+ List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+ Collections.sort(sortedKeys, Collections.reverseOrder());
+
+
+ Iterator<T> it = coll.iterator();
+ for(Integer chunkSize : sortedKeys) {
+ if(!it.hasNext()) { break; }
+ Integer times = parts.get(chunkSize);
+ for(int i = 0; i < times; i++) {
+ if(!it.hasNext()) { break; }
+ List<T> chunkList = new ArrayList<>();
+ for(int j = 0; j < chunkSize; j++) {
+ if(!it.hasNext()) { break; }
+ chunkList.add(it.next());
+ }
+ ret.add(chunkList);
+ }
+ }
+
+ return ret;
+ }
+
+ /**
+ * Return a new instance of a pluggable specified in the conf.
+ * @param conf The conf to read from.
+ * @param configKey The key pointing to the pluggable class
+ * @return an instance of the class or null if it is not specified.
+ */
+ public static Object getConfiguredClass(Map conf, Object configKey) {
+ if (conf.containsKey(configKey)) {
+ return newInstance((String)conf.get(configKey));
+ }
+ return null;
+ }
+
+ public static String logsFilename(String stormId, String port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "worker.log";
+ }
+
+ public static String eventLogsFilename(String stormId, String port) {
+ return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "events.log";
+ }
+
+ public static Object readYamlFile(String yamlFile) {
+ try (FileReader reader = new FileReader(yamlFile)) {
+ return new Yaml(new SafeConstructor()).load(reader);
+ } catch(Exception ex) {
+ LOG.error("Failed to read yaml file.", ex);
+ }
+ return null;
+ }
+
+ public static void setupDefaultUncaughtExceptionHandler() {
+ Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread thread, Throwable thrown) {
+ try {
+ handleUncaughtException(thrown);
+ } catch (Error err) {
+ LOG.error("Received error in main thread.. terminating server...", err);
+ Runtime.getRuntime().exit(-2);
+ }
+ }
+ });
+ }
+
+ /**
+ * Creates a new map with a string value in the map replaced with an
+ * equivalently-lengthed string of '#'.
+ * @param m The map that a value will be redacted from
+ * @param key The key pointing to the value to be redacted
+ * @return a new map with the value redacted. The original map will not be modified.
+ */
+ public static Map<Object, String> redactValue(Map<Object, String> m, Object key) {
+ if(m.containsKey(key)) {
+ HashMap<Object, String> newMap = new HashMap<>(m);
+ String value = newMap.get(key);
+ String redacted = new String(new char[value.length()]).replace("\0", "#");
+ newMap.put(key, redacted);
+ return newMap;
+ }
+ return m;
+ }
+
+ /**
+ * Make sure a given key name is valid for the storm config.
+ * Throw RuntimeException if the key isn't valid.
+ * @param name The name of the config key to check.
+ */
+ private static final Set<String> disallowedKeys = new HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
+ public static void validateKeyName(String name) {
+
+ for(String key : disallowedKeys) {
+ if( name.contains(key) ) {
+ throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString());
+ }
+ }
+ if(name.trim().isEmpty()) {
+ throw new RuntimeException("Key name cannot be blank");
+ }
+ }
+
+ /**
+ * Find the first item of coll for which pred.test(...) returns true.
+ * @param pred The IPredicate to test for
+ * @param coll The Collection of items to search through.
+ * @return The first matching value in coll, or null if nothing matches.
+ */
+ public static <T> T findOne (IPredicate<T> pred, Collection<T> coll) {
+ if(coll == null) {
+ return null;
+ }
+ for(T elem : coll) {
+ if (pred.test(elem)) {
+ return elem;
+ }
+ }
+ return null;
+ }
+
+ public static <T, U> T findOne (IPredicate<T> pred, Map<U, T> map) {
+ if(map == null) {
+ return null;
+ }
+ return findOne(pred, (Set<T>)map.entrySet());
+ }
+
+ public static String localHostname () throws UnknownHostException {
+ return _instance.localHostnameImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected String localHostnameImpl () throws UnknownHostException {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+
+ private static String memoizedLocalHostnameString = null;
+
+ public static String memoizedLocalHostname () throws UnknownHostException {
+ if (memoizedLocalHostnameString == null) {
+ memoizedLocalHostnameString = localHostname();
+ }
+ return memoizedLocalHostnameString;
+ }
+
+ /**
+ * Gets the storm.local.hostname value, or tries to figure out the local hostname
+ * if it is not set in the config.
+ * @param conf The storm config to read from
+ * @return a string representation of the hostname.
+ */
+ public static String hostname (Map<String, Object> conf) throws UnknownHostException {
+ if (conf == null) {
+ return memoizedLocalHostname();
+ }
+ Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+ if (hostnameString == null || hostnameString.equals("")) {
+ return memoizedLocalHostname();
+ }
+ return (String)hostnameString;
+ }
+
+ public static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
+ public static void exitProcess (int val, Object... msg) {
+ StringBuilder errorMessage = new StringBuilder();
+ errorMessage.append("Halting process: ");
+ for (Object oneMessage: msg) {
+ errorMessage.append(oneMessage);
+ }
+ String combinedErrorMessage = errorMessage.toString();
+ LOG.error(combinedErrorMessage, new RuntimeException(combinedErrorMessage));
+ Runtime.getRuntime().exit(val);
+ }
+
+ /**
+ * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+ *
+ * Example usage in java:
+ * Map<Integer, String> tasks;
+ * Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
+ *
+ * The order of he resulting list values depends on the ordering properties
+ * of the Map passed in. The caller is responsible for passing an ordered
+ * map if they expect the result to be consistently ordered as well.
+ *
+ * @param map to reverse
+ * @return a reversed map
+ */
+ public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+ HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+ if (map == null) {
+ return rtn;
+ }
+ for (Entry<K, V> entry : map.entrySet()) {
+ K key = entry.getKey();
+ V val = entry.getValue();
+ List<K> list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<K>();
+ rtn.put(entry.getValue(), list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+ /**
+ * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
+ * Reverses an assoc-list style Map like reverseMap(Map...)
+ *
+ * @param listSeq to reverse
+ * @return a reversed map
+ */
+ public static HashMap reverseMap(List listSeq) {
+ HashMap<Object, List<Object>> rtn = new HashMap();
+ if (listSeq == null) {
+ return rtn;
+ }
+ for (Object entry : listSeq) {
+ List listEntry = (List) entry;
+ Object key = listEntry.get(0);
+ Object val = listEntry.get(1);
+ List list = rtn.get(val);
+ if (list == null) {
+ list = new ArrayList<Object>();
+ rtn.put(val, list);
+ }
+ list.add(key);
+ }
+ return rtn;
+ }
+
+
+ /**
+ * @return the pid of this JVM, because Java doesn't provide a real way to do this.
+ */
+ public static String processPid() {
+ String name = ManagementFactory.getRuntimeMXBean().getName();
+ String[] split = name.split("@");
+ if (split.length != 2) {
+ throw new RuntimeException("Got unexpected process name: " + name);
+ }
+ return split[0];
+ }
+
+ public static int execCommand(String... command) throws ExecuteException, IOException {
+ CommandLine cmd = new CommandLine(command[0]);
+ for (int i = 1; i < command.length; i++) {
+ cmd.addArgument(command[i]);
+ }
+
+ DefaultExecutor exec = new DefaultExecutor();
+ return exec.execute(cmd);
+ }
+
+ /**
+ * Extract dir from the jar to destdir
+ *
+ * @param jarpath Path to the jar file
+ * @param dir Directory in the jar to pull out
+ * @param destdir Path to the directory where the extracted directory will be put
+ *
+ */
+ public static void extractDirFromJar(String jarpath, String dir, String destdir) {
+ try (JarFile jarFile = new JarFile(jarpath)) {
+ Enumeration<JarEntry> jarEnums = jarFile.entries();
+ while (jarEnums.hasMoreElements()) {
+ JarEntry entry = jarEnums.nextElement();
+ if (!entry.isDirectory() && entry.getName().startsWith(dir)) {
+ File aFile = new File(destdir, entry.getName());
+ aFile.getParentFile().mkdirs();
+ try (FileOutputStream out = new FileOutputStream(aFile);
+ InputStream in = jarFile.getInputStream(entry)) {
+ IOUtils.copy(in, out);
+ }
+ }
+ }
+ } catch (IOException e) {
+ LOG.info("Could not extract {} from {}", dir, jarpath);
+ }
+ }
+
+ public static void sendSignalToProcess(long lpid, int signum) throws IOException {
+ String pid = Long.toString(lpid);
+ try {
+ if (isOnWindows()) {
+ if (signum == SIGKILL) {
+ execCommand("taskkill", "/f", "/pid", pid);
+ } else {
+ execCommand("taskkill", "/pid", pid);
+ }
+ } else {
+ execCommand("kill", "-" + signum, pid);
+ }
+ } catch (ExecuteException e) {
+ LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
+ } catch (IOException e) {
+ LOG.info("IOException Error when trying to kill {}.", pid);
+ throw e;
+ }
+ }
+
+ public static void forceKillProcess (String pid) throws IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+ }
+
+ public static void killProcessWithSigTerm (String pid) throws IOException {
+ sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+ }
+
+ /**
+ * Adds the user supplied function as a shutdown hook for cleanup.
+ * Also adds a function that sleeps for a second and then halts the
+ * runtime to avoid any zombie process in case cleanup function hangs.
+ */
+ public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+ Runnable sleepKill = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Time.sleepSecs(1);
+ Runtime.getRuntime().halt(20);
+ } catch (Exception e) {
+ LOG.warn("Exception in the ShutDownHook", e);
+ }
+ }
+ };
+ Runtime.getRuntime().addShutdownHook(new Thread(func));
+ Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+ }
+
+ /**
+ * Returns the combined string, escaped for posix shell.
+ * @param command the list of strings to be combined
+ * @return the resulting command string
+ */
+ public static String shellCmd (List<String> command) {
+ List<String> changedCommands = new ArrayList<>(command.size());
+ for (String str: command) {
+ if (str == null) {
+ continue;
+ }
+ changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + "'");
+ }
+ return StringUtils.join(changedCommands, " ");
+ }
+
+ public static String scriptFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+ }
+
+ public static String containerFilePath (String dir) {
+ return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+ }
+
+ public static Object nullToZero (Object v) {
+ return (v != null ? v : 0);
+ }
+
+ /**
+ * Deletes a file or directory and its contents if it exists. Does not
+ * complain if the input is null or does not exist.
+ * @param path the path to the file or directory
+ */
+ public static void forceDelete(String path) throws IOException {
+ _instance.forceDeleteImpl(path);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ protected void forceDeleteImpl(String path) throws IOException {
+ LOG.debug("Deleting path {}", path);
+ if (checkFileExists(path)) {
+ try {
+ FileUtils.forceDelete(new File(path));
+ } catch (FileNotFoundException ignored) {}
+ }
+ }
+
+ /**
+ * Creates a symbolic link to the target
+ * @param dir the parent directory of the link
+ * @param targetDir the parent directory of the link's target
+ * @param targetFilename the file name of the links target
+ * @param filename the file name of the link
+ * @throws IOException
+ */
+ public static void createSymlink(String dir, String targetDir,
+ String targetFilename, String filename) throws IOException {
+ Path path = Paths.get(dir, filename).toAbsolutePath();
+ Path target = Paths.get(targetDir, targetFilename).toAbsolutePath();
+ LOG.debug("Creating symlink [{}] to [{}]", path, target);
+ if (!path.toFile().exists()) {
+ Files.createSymbolicLink(path, target);
+ }
+ }
+
+ /**
+ * Convenience method for the case when the link's file name should be the
+ * same as the file name of the target
+ */
+ public static void createSymlink(String dir, String targetDir,
+ String targetFilename) throws IOException {
+ Utils.createSymlink(dir, targetDir, targetFilename,
+ targetFilename);
+ }
+
+ /**
+ * Returns a Collection of file names found under the given directory.
+ * @param dir a directory
+ * @return the Collection of file names
+ */
+ public static Collection<String> readDirContents(String dir) {
+ Collection<String> ret = new HashSet<>();
+ File[] files = new File(dir).listFiles();
+ if (files != null) {
+ for (File f: files) {
+ ret.add(f.getName());
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Returns the value of java.class.path System property. Kept separate for
+ * testing.
+ * @return the classpath
+ */
+ public static String currentClasspath() {
+ return _instance.currentClasspathImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public String currentClasspathImpl() {
+ return System.getProperty("java.class.path");
+ }
+
+ /**
+ * Returns a collection of jar file names found under the given directory.
+ * @param dir the directory to search
+ * @return the jar file names
+ */
+ private static List<String> getFullJars(String dir) {
+ File[] files = new File(dir).listFiles(jarFilter);
+
+ if(files == null) {
+ return new ArrayList<>();
+ }
+
+ List<String> ret = new ArrayList<>(files.length);
+ for (File f : files) {
+ ret.add(Paths.get(dir, f.getName()).toString());
+ }
+ return ret;
+ }
+ private static final FilenameFilter jarFilter = new FilenameFilter() {
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.endsWith(".jar");
+ }
+ };
+
+
+ public static String workerClasspath() {
+ String stormDir = System.getProperty("storm.home");
+
+ if (stormDir == null) {
+ return Utils.currentClasspath();
+ }
+
+ String stormLibDir = Paths.get(stormDir, "lib").toString();
+ String stormConfDir =
+ System.getenv("STORM_CONF_DIR") != null ?
+ System.getenv("STORM_CONF_DIR") :
+ Paths.get(stormDir, "conf").toString();
+ String stormExtlibDir = Paths.get(stormDir, "extlib").toString();
+ String extcp = System.getenv("STORM_EXT_CLASSPATH");
+ List<String> pathElements = new LinkedList<>();
+ pathElements.addAll(Utils.getFullJars(stormLibDir));
+ pathElements.addAll(Utils.getFullJars(stormExtlibDir));
+ pathElements.add(extcp);
+ pathElements.add(stormConfDir);
+
+ return StringUtils.join(pathElements,
+ CLASS_PATH_SEPARATOR);
+ }
+
+ public static String addToClasspath(String classpath,
+ Collection<String> paths) {
+ return _instance.addToClasspathImpl(classpath, paths);
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public String addToClasspathImpl(String classpath,
+ Collection<String> paths) {
+ if (paths == null || paths.isEmpty()) {
+ return classpath;
+ }
+ List<String> l = new LinkedList<>();
+ l.add(classpath);
+ l.addAll(paths);
+ return StringUtils.join(l, CLASS_PATH_SEPARATOR);
+ }
+
+ public static class UptimeComputer {
+ int startTime = 0;
+
+ public UptimeComputer() {
+ startTime = Time.currentTimeSecs();
+ }
+
+ public int upTime() {
+ return Time.deltaSecs(startTime);
+ }
+ }
+
+ public static UptimeComputer makeUptimeComputer() {
+ return _instance.makeUptimeComputerImpl();
+ }
+
+ // Non-static impl methods exist for mocking purposes.
+ public UptimeComputer makeUptimeComputerImpl() {
+ return new UptimeComputer();
+ }
+
+ /**
+ * Writes a posix shell script file to be executed in its own process.
+ * @param dir the directory under which the script is to be written
+ * @param command the command the script is to execute
+ * @param environment optional environment variables to set before running the script's command. May be null.
+ * @return the path to the script that has been written
+ */
+ public static String writeScript(String dir, List<String> command,
+ Map<String,String> environment) throws IOException {
+ String path = Utils.scriptFilePath(dir);
+ try(BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
+ out.write("#!/bin/bash");
+ out.newLine();
+ if (environment != null) {
+ for (String k : environment.keySet()) {
+ String v = environment.get(k);
+ if (v == null) {
+ v = "";
+ }
+ out.write(Utils.shellCmd(
+ Arrays.asList(
+ "export",k+"="+v)));
+ out.write(";");
+ out.newLine();
+ }
+ }
+ out.newLine();
+ out.write("exec "+Utils.shellCmd(command)+";");
+ }
+ return path;
+ }
+
+ /**
+ * A thread that can answer if it is sleeping in the case of simulated time.
+ * This class is not useful when simulated time is not being used.
+ */
+ public static class SmartThread extends Thread {
+ public boolean isSleeping() {
+ return Time.isThreadWaiting(this);
+ }
+ public SmartThread(Runnable r) {
+ super(r);
+ }
+ }
+
+ /**
+ * Creates a thread that calls the given code repeatedly, sleeping for an
+ * interval of seconds equal to the return value of the previous call.
+ *
+ * The given afn may be a callable that returns the number of seconds to
+ * sleep, or it may be a Callable that returns another Callable that in turn
+ * returns the number of seconds to sleep. In the latter case isFactory.
+ *
+ * @param afn the code to call on each iteration
+ * @param isDaemon whether the new thread should be a daemon thread
+ * @param eh code to call when afn throws an exception
+ * @param priority the new thread's priority
+ * @param isFactory whether afn returns a callable instead of sleep seconds
+ * @param startImmediately whether to start the thread before returning
+ * @param threadName a suffix to be appended to the thread name
+ * @return the newly created thread
+ * @see java.lang.Thread
+ */
+ public static SmartThread asyncLoop(final Callable afn,
+ boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+ int priority, final boolean isFactory, boolean startImmediately,
+ String threadName) {
+ SmartThread thread = new SmartThread(new Runnable() {
+ public void run() {
+ Object s;
+ try {
+ Callable fn = isFactory ? (Callable) afn.call() : afn;
+ while ((s = fn.call()) instanceof Long) {
+ Time.sleepSecs((Long) s);
+ }
+ } catch (Throwable t) {
+ if (Utils.exceptionCauseIsInstanceOf(
+ InterruptedException.class, t)) {
+ LOG.info("Async loop interrupted!");
+ return;
+ }
+ LOG.error("Async loop died!", t);
+ throw new RuntimeException(t);
+ }
+ }
+ });
+ if (eh != null) {
+ thread.setUncaughtExceptionHandler(eh);
+ } else {
+ thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.error("Async loop died!", e);
+ Utils.exitProcess(1, "Async loop died!");
+ }
+ });
+ }
+ thread.setDaemon(isDaemon);
+ thread.setPriority(priority);
+ if (threadName != null && !threadName.isEmpty()) {
+ thread.setName(thread.getName() +"-"+ threadName);
+ }
+ if (startImmediately) {
+ thread.start();
+ }
+ return thread;
+ }
+
+ /**
+ * Convenience method used when only the function and name suffix are given.
+ * @param afn the code to call on each iteration
+ * @param threadName a suffix to be appended to the thread name
+ * @return the newly created thread
+ * @see java.lang.Thread
+ */
+ public static SmartThread asyncLoop(final Callable afn, String threadName, final Thread.UncaughtExceptionHandler eh) {
+ return asyncLoop(afn, false, eh, Thread.NORM_PRIORITY, false, true,
+ threadName);
+ }
+
+ /**
+ * Convenience method used when only the function is given.
+ * @param afn the code to call on each iteration
+ * @return the newly created thread
+ */
+ public static SmartThread asyncLoop(final Callable afn) {
+ return asyncLoop(afn, false, null, Thread.NORM_PRIORITY, false, true,
+ null);
+ }
+
+ /**
+ * A callback that can accept an integer.
+ * @param <V> the result type of method <code>call</code>
+ */
+ public interface ExitCodeCallable<V> extends Callable<V> {
+ V call(int exitCode);
+ }
+
+ /**
+ * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
+ * callback.
+ * @param command the command to be executed in the new process
+ * @param environment the environment to be applied to the process. Can be
+ * null.
+ * @param logPrefix a prefix for log entries from the output of the process.
+ * Can be null.
+ * @param exitCodeCallback code to be called passing the exit code value
+ * when the process completes
+ * @param dir the working directory of the new process
+ * @return the new process
+ * @throws IOException
+ * @see java.lang.ProcessBuilder
+ */
+ public static Process launchProcess(List<String> command,
+ Map<String,String> environment,
+ final String logPrefix,
+ final ExitCodeCallable exitCodeCallback,
+ File dir)
+ throws IOException {
+ return _instance.launchProcessImpl(command, environment, logPrefix,
+ exitCodeCallback, dir);
+ }
+
+ public Process launchProcessImpl(
+ List<String> command,
+ Map<String,String> cmdEnv,
+ final String logPrefix,
+ final ExitCodeCallable exitCodeCallback,
+ File dir)
+ throws IOException {
+ ProcessBuilder builder = new ProcessBuilder(command);
+ Map<String,String> procEnv = builder.environment();
+ if (dir != null) {
+ builder.directory(dir);
+ }
+ builder.redirectErrorStream(true);
+ if (cmdEnv != null) {
+ procEnv.putAll(cmdEnv);
+ }
+ final Process process = builder.start();
+ if (logPrefix != null || exitCodeCallback != null) {
+ Utils.asyncLoop(new Callable() {
+ public Object call() {
+ if (logPrefix != null ) {
+ Utils.readAndLogStream(logPrefix,
+ process.getInputStream());
+ }
+ if (exitCodeCallback != null) {
+ try {
+ process.waitFor();
+ } catch (InterruptedException ie) {
+ LOG.info("{} interrupted", logPrefix);
+ exitCodeCallback.call(process.exitValue());
+ }
+ }
+ return null; // Run only once.
+ }
+ });
+ }
+ return process;
+ }
++
+ public static long bitXor(Object a, Object b) {
- return ((Long) a) ^ ((Long) b);
++ return ((Long) a) ^ ((Long) b);
+ }
}
-
[03/13] storm git commit: [STORM-1245] port
backtype.storm.daemon.acker to java
Posted by lo...@apache.org.
[STORM-1245] port backtype.storm.daemon.acker to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50a312f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50a312f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50a312f
Branch: refs/heads/master
Commit: e50a312f1440131e8b9e0cc055d475cbbe711cb9
Parents: 675b0c4
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 16:01:43 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 16:01:43 2016 +0800
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/daemon/acker.clj | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e50a312f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 9902b35..39e6f55 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -20,7 +20,7 @@
(:import [java.util List Map])
(:import [org.apache.storm Constants]
(org.apache.storm.daemon AckerBolt))
- (:use [org.apache.storm config util log])
+ (:use [org.apache.storm config util])
(:gen-class
:init init
:implements [org.apache.storm.task.IBolt]
@@ -35,7 +35,6 @@
(defn mk-acker-bolt []
(let [output-collector (MutableObject.)
pending (MutableObject.)]
- (log-message "Symbol AckerBolt" (symbol "AckerBolt") )
(AckerBolt.)))
(defn -init []
[11/13] storm git commit: restore indent change
Posted by lo...@apache.org.
restore indent change
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14b993a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14b993a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14b993a8
Branch: refs/heads/master
Commit: 14b993a882cf8d416f144a5a3cf4cb8d87a8811d
Parents: 9603e30
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 22:21:20 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 22:21:20 2016 +0800
----------------------------------------------------------------------
storm-core/src/clj/org/apache/storm/testing.clj | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/14b993a8/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index f04befc..804278c 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -691,8 +691,8 @@
(increment-global! id# "transferred" 1)
(apply transferrer# args2#)))))]
(with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
- (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
- ~@body)))
+ (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+ ~@body)))
(RegisteredGlobalState/clearState id#)))
(defn tracked-wait
[10/13] storm git commit: update according to review comments
Posted by lo...@apache.org.
update according to review comments
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9603e309
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9603e309
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9603e309
Branch: refs/heads/master
Commit: 9603e30961dbb2391bf309031f64c29f808277f7
Parents: f54f2cf
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 22:08:43 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 22:08:43 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/acker.clj | 58 ---------
.../src/clj/org/apache/storm/daemon/common.clj | 17 ++-
storm-core/src/clj/org/apache/storm/testing.clj | 11 +-
.../src/jvm/org/apache/storm/daemon/Acker.java | 127 +++++++++++++++++++
.../jvm/org/apache/storm/daemon/AckerBolt.java | 127 -------------------
.../src/jvm/org/apache/storm/utils/Utils.java | 4 +-
6 files changed, 144 insertions(+), 200 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
deleted file mode 100644
index 9aa15ae..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ /dev/null
@@ -1,58 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.acker
- (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
- [org.apache.storm.utils Utils])
- (:import [org.apache.storm.tuple Tuple Fields])
- (:import [org.apache.storm.utils Container RotatingMap MutableObject])
- (:import [java.util List Map])
- (:import [org.apache.storm Constants]
- (org.apache.storm.daemon AckerBolt))
- (:use [org.apache.storm config log])
- (:gen-class
- :init init
- :implements [org.apache.storm.task.IBolt]
- :constructors {[] []}
- :state state))
-
-(def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID AckerBolt/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID AckerBolt/ACKER_FAIL_STREAM_ID)
-
-(defn mk-acker-bolt []
- (let [output-collector (MutableObject.)
- pending (MutableObject.)]
- (AckerBolt.)))
-
-(defn -init []
- [[] (Container.)])
-
-(defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
- (let [^IBolt ret (mk-acker-bolt)]
- (.. this state (set ret))
- (.prepare ret conf context collector)
- ))
-
-(defn -execute [^org.apache.storm.daemon.acker this tuple]
- (let [^IBolt delegate (.. this state (get))]
- (.execute delegate tuple)
- ))
-
-(defn -cleanup [^org.apache.storm.daemon.acker this]
- (let [^IBolt delegate (.. this state (get))]
- (.cleanup delegate)
- ))
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index db7fd40..4076e38 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -29,9 +29,9 @@
(:import [java.io InterruptedIOException]
[org.json.simple JSONValue])
(:import [java.util HashMap])
- (:import [org.apache.storm Thrift])
+ (:import [org.apache.storm Thrift]
+ (org.apache.storm.daemon Acker))
(:require [clojure.set :as set])
- (:require [org.apache.storm.daemon.acker :as acker])
(:require [metrics.reporters.jmx :as jmx])
(:require [metrics.core :refer [default-registry]]))
@@ -46,10 +46,10 @@
(start-metrics-reporter reporter conf)))
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
(def SYSTEM-STREAM-ID "__system")
@@ -222,10 +222,13 @@
))]
(merge spout-inputs bolt-inputs)))
+(defn mk-acker-bolt []
+ (Acker.))
+
(defn add-acker! [storm-conf ^StormTopology ret]
(let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret)
- (new org.apache.storm.daemon.acker)
+ (mk-acker-bolt)
{ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"])
ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"])
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7817929..f04befc 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -50,7 +50,6 @@
(org.apache.storm.messaging IContext)
[org.json.simple JSONValue])
(:require [org.apache.storm [zookeeper :as zk]])
- (:require [org.apache.storm.daemon.acker :as acker])
(:use [org.apache.storm cluster util config log local-state-converter])
(:use [org.apache.storm.internal thrift]))
@@ -675,9 +674,9 @@
(.put "transferred" (AtomicInteger. 0))
(.put "processed" (AtomicInteger. 0))))
(with-var-roots
- [acker/mk-acker-bolt
- (let [old# acker/mk-acker-bolt]
- (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
+ [common/mk-acker-bolt
+ (let [old# common/mk-acker-bolt]
+ (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
;; critical that this particular function is overridden here,
;; since the transferred stat needs to be incremented at the moment
;; of tuple emission (and not on a separate thread later) for
@@ -692,8 +691,8 @@
(increment-global! id# "transferred" 1)
(apply transferrer# args2#)))))]
(with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
- (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
- ~@body)))
+ (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+ ~@body)))
(RegisteredGlobalState/clearState id#)))
(defn tracked-wait
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..98f73df
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class Acker implements IBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+ private static final long serialVersionUID = 4430906880683183091L;
+
+ public static final String ACKER_COMPONENT_ID = "__acker";
+ public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+ public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+ public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+ public static final int TIMEOUT_BUCKET_NUM = 3;
+
+ private OutputCollector collector;
+ private RotatingMap<Object, AckObject> pending;
+
+ private class AckObject {
+ public long val = 0L;
+ public Integer spoutTask = null;
+ public boolean failed = false;
+
+ // val xor value
+ public void updateAck(Long value) {
+ val = Utils.bitXor(val, value);
+ }
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ if (TupleUtils.isTick(input)) {
+ Map<Object, AckObject> tmp = pending.rotate();
+ LOG.debug("Number of timeout tuples:{}", tmp.size());
+ return;
+ }
+
+ String streamId = input.getSourceStreamId();
+ Object id = input.getValue(0);
+ AckObject curr = pending.get(id);
+ if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ curr.spoutTask = input.getInteger(2);
+ pending.put(id, curr);
+ } else {
+ // If receiving bolt's ack before the init message from spout, just update the xor value.
+ curr.updateAck(input.getLong(1));
+ curr.spoutTask = input.getInteger(2);
+ }
+ } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+ if (curr != null) {
+ curr.updateAck(input.getLong(1));
+ } else {
+ curr = new AckObject();
+ curr.val = input.getLong(1);
+ pending.put(id, curr);
+ }
+ } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+ if (curr == null) {
+ // The tuple has been already timeout or failed. So, do nothing
+ return;
+ }
+ curr.failed = true;
+ } else {
+ LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+ return;
+ }
+
+ Integer task = curr.spoutTask;
+ if (task != null) {
+ if (curr.val == 0) {
+ pending.remove(id);
+ collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
+ } else {
+ if (curr.failed) {
+ pending.remove(id);
+ collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
+ }
+ }
+ }
+
+ collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+ LOG.info("Acker: cleanup successfully");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
deleted file mode 100644
index 7c1514f..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class AckerBolt implements IBolt {
- private static final Logger LOG = LoggerFactory.getLogger(AckerBolt.class);
-
- private static final long serialVersionUID = 4430906880683183091L;
-
- public static final String ACKER_COMPONENT_ID = "__acker";
- public static final String ACKER_INIT_STREAM_ID = "__ack_init";
- public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
- public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
-
- public static final int TIMEOUT_BUCKET_NUM = 3;
-
- private OutputCollector collector;
- private RotatingMap<Object, AckObject> pending;
-
- private class AckObject {
- public long val = 0L;
- public Integer spoutTask = null;
- public boolean failed = false;
-
- // val xor value
- public void updateAck(Object value) {
- val = Utils.bitXor(val, value);
- }
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
- this.collector = collector;
- this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
- }
-
- @Override
- public void execute(Tuple input) {
- if (TupleUtils.isTick(input)) {
- Map<Object, AckObject> tmp = pending.rotate();
- LOG.debug("Number of timeout tuples:{}", tmp.size());
- return;
- }
-
- String streamId = input.getSourceStreamId();
- Object id = input.getValue(0);
- AckObject curr = pending.get(id);
- if (ACKER_INIT_STREAM_ID.equals(streamId)) {
- if (curr == null) {
- curr = new AckObject();
- curr.val = input.getLong(1);
- curr.spoutTask = input.getInteger(2);
- pending.put(id, curr);
- } else {
- // If receiving bolt's ack before the init message from spout, just update the xor value.
- curr.updateAck(input.getLong(1));
- curr.spoutTask = input.getInteger(2);
- }
- } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
- if (curr != null) {
- curr.updateAck(input.getLong(1));
- } else {
- curr = new AckObject();
- curr.val = input.getLong(1);
- pending.put(id, curr);
- }
- } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
- if (curr == null) {
- // The tuple has been already timeout or failed. So, do nothing
- return;
- }
- curr.failed = true;
- } else {
- LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
- return;
- }
-
- Integer task = curr.spoutTask;
- if (task != null) {
- if (curr.val == 0) {
- pending.remove(id);
- collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
- } else {
- if (curr.failed) {
- pending.remove(id);
- collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
- }
- }
- }
-
- collector.ack(input);
- }
-
- @Override
- public void cleanup() {
- LOG.info("Acker: cleanup successfully");
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index c2c5e62..43c00fc 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2278,7 +2278,7 @@ public class Utils {
return process;
}
- public static long bitXor(Object a, Object b) {
- return ((Long) a) ^ ((Long) b);
+ public static long bitXor(Long a, Long b) {
+ return a ^ b;
}
}
[09/13] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f54f2cf4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f54f2cf4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f54f2cf4
Branch: refs/heads/master
Commit: f54f2cf4e72091fd75c2b3f365caf3dcf31735e1
Parents: 4e26f06 4ca7522
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 10:22:22 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 10:22:22 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 18 +
README.markdown | 1 +
bin/storm-config.cmd | 4 +
bin/storm.cmd | 24 +-
bin/storm.py | 12 +-
conf/cgconfig.conf.example | 41 +++
conf/defaults.yaml | 16 +-
examples/storm-starter/pom.xml | 10 +
.../org/apache/storm/starter/clj/word_count.clj | 3 +-
.../starter/ResourceAwareExampleTopology.java | 2 +-
.../spout/RandomNumberGeneratorSpout.java | 95 +++++
.../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
.../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++
external/storm-hdfs/pom.xml | 23 +-
pom.xml | 16 +
storm-clojure/pom.xml | 74 ++++
.../src/clj/org/apache/storm/clojure.clj | 207 +++++++++++
.../src/clj/org/apache/storm/thrift.clj | 286 +++++++++++++++
storm-clojure/src/test/clj/clojure_test.clj | 158 +++++++++
storm-core/pom.xml | 9 +
storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
.../clj/org/apache/storm/command/activate.clj | 24 --
.../clj/org/apache/storm/command/deactivate.clj | 24 --
.../org/apache/storm/command/dev_zookeeper.clj | 28 --
.../clj/org/apache/storm/command/get_errors.clj | 3 +-
.../org/apache/storm/command/healthcheck.clj | 90 -----
.../org/apache/storm/command/kill_topology.clj | 29 --
.../src/clj/org/apache/storm/command/list.clj | 38 --
.../clj/org/apache/storm/command/monitor.clj | 2 +-
.../clj/org/apache/storm/command/rebalance.clj | 3 +-
.../org/apache/storm/command/set_log_level.clj | 3 +-
.../apache/storm/command/shell_submission.clj | 2 +-
.../src/clj/org/apache/storm/daemon/common.clj | 121 ++++---
.../clj/org/apache/storm/daemon/executor.clj | 114 +++---
.../clj/org/apache/storm/daemon/logviewer.clj | 19 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 104 +++---
.../clj/org/apache/storm/daemon/supervisor.clj | 251 +++++++++----
.../src/clj/org/apache/storm/daemon/task.clj | 4 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 170 +++++----
.../src/clj/org/apache/storm/disruptor.clj | 89 -----
storm-core/src/clj/org/apache/storm/event.clj | 71 ----
.../clj/org/apache/storm/internal/clojure.clj | 201 +++++++++++
.../clj/org/apache/storm/internal/thrift.clj | 96 +++++
.../src/clj/org/apache/storm/local_state.clj | 134 -------
.../org/apache/storm/local_state_converter.clj | 24 ++
storm-core/src/clj/org/apache/storm/testing.clj | 37 +-
storm-core/src/clj/org/apache/storm/thrift.clj | 286 ---------------
storm-core/src/clj/org/apache/storm/timer.clj | 128 -------
storm-core/src/clj/org/apache/storm/ui/core.clj | 8 +-
storm-core/src/jvm/org/apache/storm/Config.java | 88 +++++
.../src/jvm/org/apache/storm/StormTimer.java | 241 +++++++++++++
storm-core/src/jvm/org/apache/storm/Thrift.java | 351 ++++++++++++++++++
.../jvm/org/apache/storm/command/Activate.java | 40 +++
.../src/jvm/org/apache/storm/command/CLI.java | 353 +++++++++++++++++++
.../org/apache/storm/command/Deactivate.java | 40 +++
.../org/apache/storm/command/DevZookeeper.java | 35 ++
.../org/apache/storm/command/HealthCheck.java | 125 +++++++
.../org/apache/storm/command/KillTopology.java | 51 +++
.../src/jvm/org/apache/storm/command/List.java | 50 +++
.../container/ResourceIsolationInterface.java | 51 +++
.../storm/container/cgroup/CgroupCenter.java | 216 ++++++++++++
.../storm/container/cgroup/CgroupCommon.java | 270 ++++++++++++++
.../container/cgroup/CgroupCommonOperation.java | 81 +++++
.../container/cgroup/CgroupCoreFactory.java | 74 ++++
.../storm/container/cgroup/CgroupManager.java | 210 +++++++++++
.../storm/container/cgroup/CgroupOperation.java | 79 +++++
.../storm/container/cgroup/CgroupUtils.java | 118 +++++++
.../apache/storm/container/cgroup/Device.java | 75 ++++
.../storm/container/cgroup/Hierarchy.java | 130 +++++++
.../storm/container/cgroup/SubSystem.java | 81 +++++
.../storm/container/cgroup/SubSystemType.java | 36 ++
.../storm/container/cgroup/SystemOperation.java | 75 ++++
.../storm/container/cgroup/core/BlkioCore.java | 213 +++++++++++
.../storm/container/cgroup/core/CgroupCore.java | 26 ++
.../storm/container/cgroup/core/CpuCore.java | 135 +++++++
.../container/cgroup/core/CpuacctCore.java | 71 ++++
.../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
.../container/cgroup/core/DevicesCore.java | 189 ++++++++++
.../container/cgroup/core/FreezerCore.java | 66 ++++
.../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
.../storm/container/cgroup/core/NetClsCore.java | 69 ++++
.../container/cgroup/core/NetPrioCore.java | 65 ++++
.../org/apache/storm/event/EventManager.java | 24 ++
.../org/apache/storm/event/EventManagerImp.java | 97 +++++
.../jvm/org/apache/storm/testing/NGrouping.java | 4 +-
.../storm/testing/PythonShellMetricsBolt.java | 14 +-
.../storm/testing/PythonShellMetricsSpout.java | 8 +-
.../jvm/org/apache/storm/trident/Stream.java | 121 ++++++-
.../operation/builtin/ComparisonAggregator.java | 91 +++++
.../storm/trident/operation/builtin/Max.java | 37 ++
.../operation/builtin/MaxWithComparator.java | 51 +++
.../storm/trident/operation/builtin/Min.java | 36 ++
.../operation/builtin/MinWithComparator.java | 51 +++
.../org/apache/storm/utils/DisruptorQueue.java | 15 +-
.../jvm/org/apache/storm/utils/LocalState.java | 112 +++++-
.../org/apache/storm/utils/NimbusClient.java | 19 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 37 +-
.../org/apache/storm/integration_test.clj | 259 ++++++++------
.../org/apache/storm/testing4j_test.clj | 72 ++--
.../test/clj/org/apache/storm/clojure_test.clj | 64 ++--
.../test/clj/org/apache/storm/cluster_test.clj | 3 +-
.../test/clj/org/apache/storm/drpc_test.clj | 23 +-
.../test/clj/org/apache/storm/grouping_test.clj | 56 +--
.../storm/messaging/netty_integration_test.clj | 18 +-
.../clj/org/apache/storm/messaging_test.clj | 14 +-
.../test/clj/org/apache/storm/metrics_test.clj | 85 +++--
.../test/clj/org/apache/storm/nimbus_test.clj | 260 +++++++++-----
.../scheduler/resource_aware_scheduler_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 175 ++++-----
.../clj/org/apache/storm/tick_tuple_test.clj | 15 +-
.../clj/org/apache/storm/transactional_test.clj | 3 +-
.../test/jvm/org/apache/storm/TestCgroups.java | 130 +++++++
.../jvm/org/apache/storm/command/TestCLI.java | 59 ++++
.../resource/TestResourceAwareScheduler.java | 3 +
114 files changed, 7747 insertions(+), 2003 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f54f2cf4/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
[12/13] storm git commit: Merge remote-tracking branch
'upstream/master'
Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'
Conflicts:
storm-core/src/clj/org/apache/storm/testing.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e433c83
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e433c83
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e433c83
Branch: refs/heads/master
Commit: 6e433c83c9ab581d8cbe9f7b42b97ffb8e25a0b4
Parents: 14b993a 07629c1
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 28 20:23:29 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 28 20:23:29 2016 +0800
----------------------------------------------------------------------
CHANGELOG.md | 21 +-
README.markdown | 3 +-
bin/flight.bash | 4 +-
bin/storm.cmd | 2 +-
bin/storm.py | 6 +-
conf/defaults.yaml | 4 +-
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 10 +-
.../storm/hdfs/bolt/AvroGenericRecordBolt.java | 8 +-
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 8 +-
.../storm/hdfs/bolt/SequenceFileBolt.java | 8 +-
.../org/apache/storm/kafka/IntSerializer.java | 10 +-
.../apache/storm/kafka/PartitionManager.java | 5 +-
.../kafka/trident/TridentKafkaEmitter.java | 5 +-
.../src/clj/org/apache/storm/thrift.clj | 2 +-
storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
.../cluster_state/zookeeper_state_factory.clj | 165 -----
.../clj/org/apache/storm/command/heartbeats.clj | 6 +-
.../clj/org/apache/storm/command/monitor.clj | 37 -
.../clj/org/apache/storm/command/rebalance.clj | 47 --
.../org/apache/storm/command/set_log_level.clj | 76 --
.../apache/storm/command/shell_submission.clj | 2 +-
.../src/clj/org/apache/storm/converter.clj | 23 +-
.../src/clj/org/apache/storm/daemon/common.clj | 12 +-
.../src/clj/org/apache/storm/daemon/drpc.clj | 38 +-
.../clj/org/apache/storm/daemon/executor.clj | 13 +-
.../clj/org/apache/storm/daemon/logviewer.clj | 65 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 165 +++--
.../clj/org/apache/storm/daemon/supervisor.clj | 46 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 68 +-
.../clj/org/apache/storm/internal/thrift.clj | 2 +-
.../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
.../clj/org/apache/storm/process_simulator.clj | 49 --
storm-core/src/clj/org/apache/storm/stats.clj | 3 +-
storm-core/src/clj/org/apache/storm/testing.clj | 21 +-
storm-core/src/clj/org/apache/storm/ui/core.clj | 128 ++--
.../src/clj/org/apache/storm/ui/helpers.clj | 199 +-----
storm-core/src/clj/org/apache/storm/util.clj | 11 +
.../src/clj/org/apache/storm/zookeeper.clj | 74 --
storm-core/src/jvm/org/apache/storm/Config.java | 9 +
.../jvm/org/apache/storm/ProcessSimulator.java | 82 +++
.../storm/blobstore/LocalFsBlobStore.java | 2 +-
.../jvm/org/apache/storm/callback/Callback.java | 23 -
.../storm/callback/ZKStateChangedCallback.java | 25 +
.../org/apache/storm/cluster/ClusterState.java | 217 ------
.../storm/cluster/ClusterStateContext.java | 2 +-
.../storm/cluster/ClusterStateFactory.java | 28 -
.../org/apache/storm/cluster/ClusterUtils.java | 244 +++++++
.../org/apache/storm/cluster/ExecutorBeat.java | 44 ++
.../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
.../storm/cluster/IStormClusterState.java | 124 ++++
.../storm/cluster/PaceMakerStateStorage.java | 216 ++++++
.../cluster/PaceMakerStateStorageFactory.java | 64 ++
.../storm/cluster/StateStorageFactory.java | 28 +
.../storm/cluster/StormClusterStateImpl.java | 692 ++++++++++++++++++
.../apache/storm/cluster/ZKStateStorage.java | 244 +++++++
.../storm/cluster/ZKStateStorageFactory.java | 36 +
.../src/jvm/org/apache/storm/command/CLI.java | 34 +-
.../jvm/org/apache/storm/command/Monitor.java | 65 ++
.../jvm/org/apache/storm/command/Rebalance.java | 86 +++
.../org/apache/storm/command/SetLogLevel.java | 116 +++
.../storm/metric/FileBasedEventLogger.java | 18 +-
.../apache/storm/pacemaker/PacemakerClient.java | 6 +-
.../security/auth/ThriftConnectionType.java | 2 +-
.../serialization/SerializationFactory.java | 2 +
.../testing/staticmocking/MockedCluster.java | 31 +
.../MockedPaceMakerStateStorageFactory.java | 32 +
.../apache/storm/topology/TopologyBuilder.java | 13 +-
.../apache/storm/trident/tuple/ConsList.java | 20 +-
.../apache/storm/ui/FilterConfiguration.java | 63 ++
.../jvm/org/apache/storm/ui/IConfigurator.java | 24 +
.../src/jvm/org/apache/storm/ui/UIHelpers.java | 267 +++++++
.../jvm/org/apache/storm/utils/ConfigUtils.java | 2 +-
.../src/jvm/org/apache/storm/utils/Time.java | 1 +
.../src/jvm/org/apache/storm/utils/Utils.java | 78 ++-
.../storm/utils/WorkerBackpressureCallback.java | 2 +-
.../storm/utils/WorkerBackpressureThread.java | 38 +-
.../org/apache/storm/zookeeper/Zookeeper.java | 77 +-
storm-core/src/ui/public/component.html | 2 +-
.../templates/topology-page-template.html | 6 +-
storm-core/src/ui/public/topology.html | 2 +-
.../org/apache/storm/integration_test.clj | 13 +-
.../test/clj/org/apache/storm/cluster_test.clj | 202 +++---
.../storm/messaging/netty_integration_test.clj | 1 +
.../test/clj/org/apache/storm/nimbus_test.clj | 158 +++--
.../storm/pacemaker_state_factory_test.clj | 121 ++--
.../storm/security/auth/nimbus_auth_test.clj | 3 +-
.../clj/org/apache/storm/supervisor_test.clj | 163 +++--
.../test/clj/org/apache/storm/utils_test.clj | 111 ---
.../org/apache/storm/command/RebalanceTest.java | 41 ++
.../apache/storm/command/SetLogLevelTest.java | 54 ++
.../jvm/org/apache/storm/command/TestCLI.java | 44 +-
.../storm/topology/TopologyBuilderTest.java | 65 ++
.../jvm/org/apache/storm/utils/TimeTest.java | 112 +++
.../jvm/org/apache/storm/utils/UtilsTest.java | 219 ++++++
.../utils/WorkerBackpressureThreadTest.java | 50 ++
.../storm/utils/staticmocking/package-info.java | 2 +-
96 files changed, 4198 insertions(+), 2637 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 804278c,8242c3e..66fc051
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -49,8 -50,9 +50,8 @@@
(:import [org.apache.storm.task TopologyContext]
(org.apache.storm.messaging IContext)
[org.json.simple JSONValue])
- (:require [org.apache.storm [zookeeper :as zk]])
- (:use [org.apache.storm cluster util config log local-state-converter])
+ (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
- (:require [org.apache.storm.daemon.acker :as acker])
+ (:use [org.apache.storm util config log local-state-converter converter])
(:use [org.apache.storm.internal thrift]))
(defn feeder-spout
http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
[08/13] storm git commit: Revert some code format problems caused by
auto merging
Posted by lo...@apache.org.
Revert some code format problems caused by auto merging
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e26f06b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e26f06b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e26f06b
Branch: refs/heads/master
Commit: 4e26f06b2d7d343953985ed611b58193c4246bbb
Parents: 6c0f4f0
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 14:33:44 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 14:33:44 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/acker.clj | 3 ++-
.../src/clj/org/apache/storm/daemon/common.clj | 6 ++---
.../src/jvm/org/apache/storm/utils/Utils.java | 24 ++++++++++----------
3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7e17d40..9aa15ae 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -44,7 +44,8 @@
(defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
(let [^IBolt ret (mk-acker-bolt)]
(.. this state (set ret))
- (.prepare ret conf context collector)))
+ (.prepare ret conf context collector)
+ ))
(defn -execute [^org.apache.storm.daemon.acker this tuple]
(let [^IBolt delegate (.. this state (get))]
http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 42fa1fa..eb1ec1e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -28,7 +28,7 @@
(:import [org.apache.storm.security.auth IAuthorizer])
(:import [java.io InterruptedIOException]
[org.json.simple JSONValue])
- (:require [clojure.set :as set])
+ (:require [clojure.set :as set])
(:require [org.apache.storm.daemon.acker :as acker])
(:require [org.apache.storm.thrift :as thrift])
(:require [metrics.core :refer [default-registry]]))
@@ -144,8 +144,8 @@
(defn component-conf [component]
(->> component
- .get_common
- .get_json_conf
+ .get_common
+ .get_json_conf
(#(if % (JSONValue/parse %)))
clojurify-structure))
http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 44fb1a1..d098d83 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -512,7 +512,7 @@ public class Utils {
if(store != null) {
// store can be null during testing when mocking utils.
- store.prepare(nconf, baseDir, nimbusInfo);
+ store.prepare(nconf, baseDir, nimbusInfo);
}
return store;
}
@@ -946,17 +946,17 @@ public class Utils {
inputStream = new BufferedInputStream(new FileInputStream(inFile));
}
try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
- for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
- unpackEntries(tis, entry, untarDir);
- entry = tis.getNextTarEntry();
- }
+ for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+ unpackEntries(tis, entry, untarDir);
+ entry = tis.getNextTarEntry();
+ }
}
} finally {
if(inputStream != null) {
inputStream.close();
+ }
}
}
- }
private static void unpackEntries(TarArchiveInputStream tis,
TarArchiveEntry entry, File outputDir) throws IOException {
@@ -975,7 +975,7 @@ public class Utils {
if (!outputFile.getParentFile().exists()) {
if (!outputFile.getParentFile().mkdirs()) {
throw new IOException("Mkdirs failed to create tar internal dir "
- + outputDir);
+ + outputDir);
}
}
int count;
@@ -1190,11 +1190,11 @@ public class Utils {
}
String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
if (stormZKUser == null) {
- throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+ throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
}
- String[] split = stormZKUser.split(":",2);
+ String[] split = stormZKUser.split(":", 2);
if (split.length != 2) {
- throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+ throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
}
ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
@@ -1258,7 +1258,7 @@ public class Utils {
return dump.toString();
}
- /*
+ /**
* Creates an instance of the pluggable SerializationDelegate or falls back to
* DefaultSerializationDelegate if something goes wrong.
* @param stormConf The config from which to pull the name of the pluggable class.
@@ -1316,7 +1316,7 @@ public class Utils {
if (!file.getParentFile().mkdirs()) {
if (!file.getParentFile().isDirectory()) {
throw new IOException("Mkdirs failed to create " +
- file.getParentFile().toString());
+ file.getParentFile().toString());
}
}
OutputStream out = new FileOutputStream(file);
[04/13] storm git commit: Acker bolt should return after processing
timeout tick tuple
Posted by lo...@apache.org.
Acker bolt should return after processing timeout tick tuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4dfa33c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4dfa33c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4dfa33c
Branch: refs/heads/master
Commit: c4dfa33c58ac8e1d3c6197b86b2976b9669a39f3
Parents: e50a312
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 18:43:17 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 18:43:17 2016 +0800
----------------------------------------------------------------------
storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java | 3 +--
1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c4dfa33c/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index 80ed4ca..a4f6815 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -67,6 +67,7 @@ public class AckerBolt implements IBolt {
if (TupleUtils.isTick(input)) {
Map<Object, AckObject> tmp = pending.rotate();
LOG.debug("Number of timeout tuples:{}", tmp.size());
+ return;
}
String streamId = input.getSourceStreamId();
@@ -115,8 +116,6 @@ public class AckerBolt implements IBolt {
collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
}
}
- } else {
-
}
collector.ack(input);