You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by lo...@apache.org on 2016/03/01 00:58:24 UTC

[01/13] storm git commit: [STORM-1245] port backtype.storm.daemon.acker to java

Repository: storm
Updated Branches:
  refs/heads/master 07629c1f8 -> c8138dc72


[STORM-1245] port backtype.storm.daemon.acker to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9dc271f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9dc271f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9dc271f

Branch: refs/heads/master
Commit: e9dc271f11e311eea2269a8f6035e2c322d8d520
Parents: 66d7a39
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 1 11:17:05 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 1 11:17:05 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/acker.clj   | 107 ---------------
 .../src/clj/org/apache/storm/daemon/common.clj  |  12 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  11 +-
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 133 +++++++++++++++++++
 .../src/jvm/org/apache/storm/utils/Utils.java   |  56 ++++++++
 5 files changed, 203 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
deleted file mode 100644
index 7c4d614..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ /dev/null
@@ -1,107 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.acker
-  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
-  (:import [org.apache.storm.tuple Tuple Fields])
-  (:import [org.apache.storm.utils RotatingMap MutableObject])
-  (:import [java.util List Map])
-  (:import [org.apache.storm Constants])
-  (:use [org.apache.storm config util log])
-  (:gen-class
-   :init init
-   :implements [org.apache.storm.task.IBolt]
-   :constructors {[] []}
-   :state state ))
-
-(def ACKER-COMPONENT-ID "__acker")
-(def ACKER-INIT-STREAM-ID "__ack_init")
-(def ACKER-ACK-STREAM-ID "__ack_ack")
-(def ACKER-FAIL-STREAM-ID "__ack_fail")
-
-(defn- update-ack [curr-entry val]
-  (let [old (get curr-entry :val 0)]
-    (assoc curr-entry :val (bit-xor old val))
-    ))
-
-(defn- acker-emit-direct [^OutputCollector collector ^Integer task ^String stream ^List values]
-  (.emitDirect collector task stream values)
-  )
-
-(defn mk-acker-bolt []
-  (let [output-collector (MutableObject.)
-        pending (MutableObject.)]
-    (reify IBolt
-      (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
-               (.setObject output-collector collector)
-               (.setObject pending (RotatingMap. 2))
-               )
-      (^void execute [this ^Tuple tuple]
-             (let [^RotatingMap pending (.getObject pending)
-                   stream-id (.getSourceStreamId tuple)]
-               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
-                 (.rotate pending)
-                 (let [id (.getValue tuple 0)
-                       ^OutputCollector output-collector (.getObject output-collector)
-                       curr (.get pending id)
-                       curr (condp = stream-id
-                                ACKER-INIT-STREAM-ID (-> curr
-                                                         (update-ack (.getValue tuple 1))
-                                                         (assoc :spout-task (.getValue tuple 2)))
-                                ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
-                                ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
-                   (.put pending id curr)
-                   (when (and curr (:spout-task curr))
-                     (cond (= 0 (:val curr))
-                           (do
-                             (.remove pending id)
-                             (acker-emit-direct output-collector
-                                                (:spout-task curr)
-                                                ACKER-ACK-STREAM-ID
-                                                [id]
-                                                ))
-                           (:failed curr)
-                           (do
-                             (.remove pending id)
-                             (acker-emit-direct output-collector
-                                                (:spout-task curr)
-                                                ACKER-FAIL-STREAM-ID
-                                                [id]
-                                                ))
-                           ))
-                   (.ack output-collector tuple)
-                   ))))
-      (^void cleanup [this]
-        )
-      )))
-
-(defn -init []
-  [[] (container)])
-
-(defn -prepare [this conf context collector]
-  (let [^IBolt ret (mk-acker-bolt)]
-    (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
-    (.prepare ret conf context collector)
-    ))
-
-(defn -execute [this tuple]
-  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
-    (.execute delegate tuple)
-    ))
-
-(defn -cleanup [this]
-  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
-    (.cleanup delegate)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6c184fd..45e0582 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,6 +15,7 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.common
   (:use [org.apache.storm log config util])
+  (:import [org.apache.storm.daemon Acker])
   (:import [org.apache.storm.generated StormTopology
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
@@ -26,17 +27,16 @@
   (:import [org.apache.storm.security.auth IAuthorizer]) 
   (:import [java.io InterruptedIOException])
   (:require [clojure.set :as set])  
-  (:require [org.apache.storm.daemon.acker :as acker])
   (:require [org.apache.storm.thrift :as thrift])
   (:require [metrics.reporters.jmx :as jmx]))
 
 (defn start-metrics-reporters []
   (jmx/start (jmx/reporter {})))
 
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
 
 (def SYSTEM-STREAM-ID "__system")
 
@@ -207,7 +207,7 @@
 (defn add-acker! [storm-conf ^StormTopology ret]
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
         acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
-                                         (new org.apache.storm.daemon.acker)
+                                         (Acker. )
                                          {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
                                           ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
                                           }

http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index cc78659..08662ff 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -45,9 +45,9 @@
   (:import [org.apache.storm.tuple Tuple])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext])
+  (:import [org.apache.storm.daemon Acker])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.messaging.loader :as msg-loader])
-  (:require [org.apache.storm.daemon.acker :as acker])
   (:use [org.apache.storm cluster util thrift config log local-state]))
 
 (defn feeder-spout
@@ -612,6 +612,11 @@
       (get key)
       .get))
 
+;; Temporary solution. It should be removed after migration.
+(defn mk-acker-bolt
+  []
+  (Acker.))
+
 (defmacro with-tracked-cluster
   [[cluster-sym & cluster-args] & body]
   `(let [id# (uuid)]
@@ -622,8 +627,8 @@
          (.put "transferred" (AtomicInteger. 0))
          (.put "processed" (AtomicInteger. 0))))
      (with-var-roots
-       [acker/mk-acker-bolt
-        (let [old# acker/mk-acker-bolt]
+       [mk-acker-bolt
+        (let [old# mk-acker-bolt]
           (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
         ;; critical that this particular function is overridden here,
         ;; since the transferred stat needs to be incremented at the moment

http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..1e38fd7
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class Acker implements IBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+    private static final long serialVersionUID = 4430906880683183091L;
+
+    public static final String ACKER_COMPONENT_ID = "__acker";
+    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+    public static final int TIMEOUT_BUCKET_NUM = 3;
+
+    private OutputCollector collector;
+    private RotatingMap<Object, AckObject> pending;
+
+    private class AckObject {
+        public long val = 0L;
+        public Integer spoutTask = null;
+        public boolean failed = false;
+
+        // val xor value
+        public void updateAck(Object value) {
+            val = Utils.bitXor(val, value);
+        }
+    }
+
+    public Acker() {
+
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (TupleUtils.isTick(input)) {
+            Map<Object, AckObject> tmp = pending.rotate();
+            LOG.debug("Number of timeout tuples:{}", tmp.size());
+        }
+
+        String streamId = input.getSourceStreamId();
+        Object id = input.getValue(0);
+        AckObject curr = pending.get(id);
+        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                curr.spoutTask = input.getInteger(2);
+                pending.put(id, curr);
+            } else {
+                // If receiving bolt's ack before the init message from spout, just update the xor value.
+                curr.updateAck(input.getValue(1));
+                curr.spoutTask = input.getInteger(2);
+            }
+        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+            if (curr != null) {
+                curr.updateAck(input.getValue(1));
+            } else {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                pending.put(id, curr);
+            }
+        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                // The tuple has been already timeout or failed. So, do nothing
+                return;
+            }
+            curr.failed = true;
+        } else {
+            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+            return;
+        }
+
+        Integer task = curr.spoutTask;
+        if (task != null) {
+            if (curr.val == 0) {
+                pending.remove(id);
+                List values = Utils.mkList(id);
+                collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+            } else {
+                if (curr.failed) {
+                    pending.remove(id);
+                    List values = Utils.mkList(id);
+                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+                }
+            }
+        } else {
+
+        }
+
+        collector.ack(input);
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e9dc271f/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 380f4dd..e3813f8 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1374,5 +1374,61 @@ public class Utils {
             return new RuntimeException(e);
         }
     }
+
+    public static <T> long bitXorValsSets(java.util.Set<T> vals) {
+        long rtn = 0l;
+        for (T n : vals) {
+            rtn = bitXor(rtn, n);
+        }
+        return rtn;
+    }
+
+    public static long bitXor(Object a, Object b) {
+        long rtn;
+
+        if (a instanceof Long && b instanceof Long) {
+            rtn = ((Long) a) ^ ((Long) b);
+            return rtn;
+        } else if (b instanceof Set) {
+            long bs = bitXorValsSets((Set) b);
+            return bitXor(a, bs);
+        } else if (a instanceof Set) {
+            long as = bitXorValsSets((Set) a);
+            return bitXor(as, b);
+        } else {
+            long ai = Long.parseLong(String.valueOf(a));
+            long bi = Long.parseLong(String.valueOf(b));
+            rtn = ai ^ bi;
+            return rtn;
+        }
+    }
+
+    public static <V> List<V> mkList(V... args) {
+        ArrayList<V> rtn = new ArrayList<V>();
+        for (V o : args) {
+            rtn.add(o);
+        }
+        return rtn;
+    }
+
+    public static <V> List<V> mkList(java.util.Set<V> args) {
+        ArrayList<V> rtn = new ArrayList<V>();
+        if (args != null) {
+            for (V o : args) {
+                rtn.add(o);
+            }
+        }
+        return rtn;
+    }
+
+    public static <V> List<V> mkList(Collection<V> args) {
+        ArrayList<V> rtn = new ArrayList<V>();
+        if (args != null) {
+            for (V o : args) {
+                rtn.add(o);
+            }
+        }
+        return rtn;
+    }
 }
 


[06/13] storm git commit: Update according to review comments

Posted by lo...@apache.org.
Update according to review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7ce9a3c9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7ce9a3c9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7ce9a3c9

Branch: refs/heads/master
Commit: 7ce9a3c9ebf460143ad75275e4108b0b32f0b206
Parents: a7d0289
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 13:03:40 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 13:03:40 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/acker.clj   |  8 +++---
 .../jvm/org/apache/storm/daemon/AckerBolt.java  | 13 +++++----
 .../src/jvm/org/apache/storm/utils/Utils.java   | 28 --------------------
 3 files changed, 10 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 39e6f55..9bd4f44 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -22,10 +22,10 @@
            (org.apache.storm.daemon AckerBolt))
   (:use [org.apache.storm config util])
   (:gen-class
-    :init init
-    :implements [org.apache.storm.task.IBolt]
-    :constructors {[] []}
-    :state state))
+   :init init
+   :implements [org.apache.storm.task.IBolt]
+   :constructors {[] []}
+   :state state))
 
 (def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
 (def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)

http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index 763b9a0..7c1514f 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -21,6 +21,7 @@ import org.apache.storm.task.IBolt;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.RotatingMap;
 import org.apache.storm.utils.TupleUtils;
 import org.apache.storm.utils.Utils;
@@ -81,12 +82,12 @@ public class AckerBolt implements IBolt {
                 pending.put(id, curr);
             } else {
                 // If receiving bolt's ack before the init message from spout, just update the xor value.
-                curr.updateAck(input.getValue(1));
+                curr.updateAck(input.getLong(1));
                 curr.spoutTask = input.getInteger(2);
             }
         } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
             if (curr != null) {
-                curr.updateAck(input.getValue(1));
+                curr.updateAck(input.getLong(1));
             } else {
                 curr = new AckObject();
                 curr.val = input.getLong(1);
@@ -107,13 +108,11 @@ public class AckerBolt implements IBolt {
         if (task != null) {
             if (curr.val == 0) {
                 pending.remove(id);
-                List values = Utils.makeList(id);
-                collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+                collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
             } else {
                 if (curr.failed) {
                     pending.remove(id);
-                    List values = Utils.makeList(id);
-                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
                 }
             }
         }
@@ -123,6 +122,6 @@ public class AckerBolt implements IBolt {
 
     @Override
     public void cleanup() {
-
+        LOG.info("Acker: cleanup successfully");
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/7ce9a3c9/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 9ca2ece..2361987 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1378,33 +1378,5 @@ public class Utils {
     public static long bitXor(Object a, Object b) {
         return  ((Long) a) ^ ((Long) b);
     }
-
-    public static <V> List<V> makeList(V... args) {
-        ArrayList<V> rtn = new ArrayList<V>();
-        for (V o : args) {
-            rtn.add(o);
-        }
-        return rtn;
-    }
-
-    public static <V> List<V> makeList(java.util.Set<V> args) {
-        ArrayList<V> rtn = new ArrayList<V>();
-        if (args != null) {
-            for (V o : args) {
-                rtn.add(o);
-            }
-        }
-        return rtn;
-    }
-
-    public static <V> List<V> makeList(Collection<V> args) {
-        ArrayList<V> rtn = new ArrayList<V>();
-        if (args != null) {
-            for (V o : args) {
-                rtn.add(o);
-            }
-        }
-        return rtn;
-    }
 }
 


[02/13] storm git commit: [Storm 1245] port backtype.storm.daemon.acker to java

Posted by lo...@apache.org.
[Storm 1245] port backtype.storm.daemon.acker to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/675b0c4f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/675b0c4f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/675b0c4f

Branch: refs/heads/master
Commit: 675b0c4f786838a13122b6743ca6c946aa1d63ee
Parents: e9dc271
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 15:46:08 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 15:46:08 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/acker.clj   |  57 ++++++++
 .../src/clj/org/apache/storm/daemon/common.clj  |  16 +--
 storm-core/src/clj/org/apache/storm/testing.clj |  11 +-
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 133 -------------------
 .../jvm/org/apache/storm/daemon/AckerBolt.java  | 129 ++++++++++++++++++
 5 files changed, 197 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
new file mode 100644
index 0000000..9902b35
--- /dev/null
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -0,0 +1,57 @@
+;; Licensed to the Apache Software Foundation (ASF) under one
+;; or more contributor license agreements.  See the NOTICE file
+;; distributed with this work for additional information
+;; regarding copyright ownership.  The ASF licenses this file
+;; to you under the Apache License, Version 2.0 (the
+;; "License"); you may not use this file except in compliance
+;; with the License.  You may obtain a copy of the License at
+;;
+;; http://www.apache.org/licenses/LICENSE-2.0
+;;
+;; Unless required by applicable law or agreed to in writing, software
+;; distributed under the License is distributed on an "AS IS" BASIS,
+;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+;; See the License for the specific language governing permissions and
+;; limitations under the License.
+(ns org.apache.storm.daemon.acker
+  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+  (:import [org.apache.storm.tuple Tuple Fields])
+  (:import [org.apache.storm.utils RotatingMap MutableObject])
+  (:import [java.util List Map])
+  (:import [org.apache.storm Constants]
+           (org.apache.storm.daemon AckerBolt))
+  (:use [org.apache.storm config util log])
+  (:gen-class
+    :init init
+    :implements [org.apache.storm.task.IBolt]
+    :constructors {[] []}
+    :state state))
+
+(def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID AckerBolt/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID AckerBolt/ACKER_FAIL_STREAM_ID)
+
+(defn mk-acker-bolt []
+  (let [output-collector (MutableObject.)
+        pending (MutableObject.)]
+    (log-message "Symbol AckerBolt"  (symbol "AckerBolt") )
+    (AckerBolt.)))
+
+(defn -init []
+  [[] (container)])
+
+(defn -prepare [this conf context collector]
+  (let [^IBolt ret (mk-acker-bolt)]
+    (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+    (.prepare ret conf context collector)))
+
+(defn -execute [this tuple]
+  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+    (.execute delegate tuple)
+    ))
+
+(defn -cleanup [this]
+  (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+    (.cleanup delegate)
+    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 45e0582..6ecc918 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -15,7 +15,6 @@
 ;; limitations under the License.
 (ns org.apache.storm.daemon.common
   (:use [org.apache.storm log config util])
-  (:import [org.apache.storm.daemon Acker])
   (:import [org.apache.storm.generated StormTopology
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
@@ -24,19 +23,20 @@
   (:import [org.apache.storm Constants])
   (:import [org.apache.storm.metric SystemBolt])
   (:import [org.apache.storm.metric EventLoggerBolt])
-  (:import [org.apache.storm.security.auth IAuthorizer]) 
+  (:import [org.apache.storm.security.auth IAuthorizer])
   (:import [java.io InterruptedIOException])
-  (:require [clojure.set :as set])  
+  (:require [clojure.set :as set])
+  (:require [org.apache.storm.daemon.acker :as acker])
   (:require [org.apache.storm.thrift :as thrift])
   (:require [metrics.reporters.jmx :as jmx]))
 
 (defn start-metrics-reporters []
   (jmx/start (jmx/reporter {})))
 
-(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
+(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
+(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
+(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
+(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
 
 (def SYSTEM-STREAM-ID "__system")
 
@@ -207,7 +207,7 @@
 (defn add-acker! [storm-conf ^StormTopology ret]
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
         acker-bolt (thrift/mk-bolt-spec* (acker-inputs ret)
-                                         (Acker. )
+                                         (new org.apache.storm.daemon.acker)
                                          {ACKER-ACK-STREAM-ID (thrift/direct-output-fields ["id"])
                                           ACKER-FAIL-STREAM-ID (thrift/direct-output-fields ["id"])
                                           }

http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 08662ff..cc78659 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -45,9 +45,9 @@
   (:import [org.apache.storm.tuple Tuple])
   (:import [org.apache.storm.generated StormTopology])
   (:import [org.apache.storm.task TopologyContext])
-  (:import [org.apache.storm.daemon Acker])
   (:require [org.apache.storm [zookeeper :as zk]])
   (:require [org.apache.storm.messaging.loader :as msg-loader])
+  (:require [org.apache.storm.daemon.acker :as acker])
   (:use [org.apache.storm cluster util thrift config log local-state]))
 
 (defn feeder-spout
@@ -612,11 +612,6 @@
       (get key)
       .get))
 
-;; Temporary solution. It should be removed after migration.
-(defn mk-acker-bolt
-  []
-  (Acker.))
-
 (defmacro with-tracked-cluster
   [[cluster-sym & cluster-args] & body]
   `(let [id# (uuid)]
@@ -627,8 +622,8 @@
          (.put "transferred" (AtomicInteger. 0))
          (.put "processed" (AtomicInteger. 0))))
      (with-var-roots
-       [mk-acker-bolt
-        (let [old# mk-acker-bolt]
+       [acker/mk-acker-bolt
+        (let [old# acker/mk-acker-bolt]
           (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
         ;; critical that this particular function is overridden here,
         ;; since the transferred stat needs to be incremented at the moment

http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
deleted file mode 100644
index 1e38fd7..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class Acker implements IBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
-
-    private static final long serialVersionUID = 4430906880683183091L;
-
-    public static final String ACKER_COMPONENT_ID = "__acker";
-    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
-    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
-    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
-
-    public static final int TIMEOUT_BUCKET_NUM = 3;
-
-    private OutputCollector collector;
-    private RotatingMap<Object, AckObject> pending;
-
-    private class AckObject {
-        public long val = 0L;
-        public Integer spoutTask = null;
-        public boolean failed = false;
-
-        // val xor value
-        public void updateAck(Object value) {
-            val = Utils.bitXor(val, value);
-        }
-    }
-
-    public Acker() {
-
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        if (TupleUtils.isTick(input)) {
-            Map<Object, AckObject> tmp = pending.rotate();
-            LOG.debug("Number of timeout tuples:{}", tmp.size());
-        }
-
-        String streamId = input.getSourceStreamId();
-        Object id = input.getValue(0);
-        AckObject curr = pending.get(id);
-        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
-            if (curr == null) {
-                curr = new AckObject();
-                curr.val = input.getLong(1);
-                curr.spoutTask = input.getInteger(2);
-                pending.put(id, curr);
-            } else {
-                // If receiving bolt's ack before the init message from spout, just update the xor value.
-                curr.updateAck(input.getValue(1));
-                curr.spoutTask = input.getInteger(2);
-            }
-        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
-            if (curr != null) {
-                curr.updateAck(input.getValue(1));
-            } else {
-                curr = new AckObject();
-                curr.val = input.getLong(1);
-                pending.put(id, curr);
-            }
-        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
-            if (curr == null) {
-                // The tuple has been already timeout or failed. So, do nothing
-                return;
-            }
-            curr.failed = true;
-        } else {
-            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
-            return;
-        }
-
-        Integer task = curr.spoutTask;
-        if (task != null) {
-            if (curr.val == 0) {
-                pending.remove(id);
-                List values = Utils.mkList(id);
-                collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
-            } else {
-                if (curr.failed) {
-                    pending.remove(id);
-                    List values = Utils.mkList(id);
-                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
-                }
-            }
-        } else {
-
-        }
-
-        collector.ack(input);
-    }
-
-    @Override
-    public void cleanup() {
-
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/675b0c4f/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
new file mode 100644
index 0000000..80ed4ca
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class AckerBolt implements IBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(AckerBolt.class);
+
+    private static final long serialVersionUID = 4430906880683183091L;
+
+    public static final String ACKER_COMPONENT_ID = "__acker";
+    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+    public static final int TIMEOUT_BUCKET_NUM = 3;
+
+    private OutputCollector collector;
+    private RotatingMap<Object, AckObject> pending;
+
+    private class AckObject {
+        public long val = 0L;
+        public Integer spoutTask = null;
+        public boolean failed = false;
+
+        // val xor value
+        public void updateAck(Object value) {
+            val = Utils.bitXor(val, value);
+        }
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (TupleUtils.isTick(input)) {
+            Map<Object, AckObject> tmp = pending.rotate();
+            LOG.debug("Number of timeout tuples:{}", tmp.size());
+        }
+
+        String streamId = input.getSourceStreamId();
+        Object id = input.getValue(0);
+        AckObject curr = pending.get(id);
+        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                curr.spoutTask = input.getInteger(2);
+                pending.put(id, curr);
+            } else {
+                // If receiving bolt's ack before the init message from spout, just update the xor value.
+                curr.updateAck(input.getValue(1));
+                curr.spoutTask = input.getInteger(2);
+            }
+        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+            if (curr != null) {
+                curr.updateAck(input.getValue(1));
+            } else {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                pending.put(id, curr);
+            }
+        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                // The tuple has been already timeout or failed. So, do nothing
+                return;
+            }
+            curr.failed = true;
+        } else {
+            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+            return;
+        }
+
+        Integer task = curr.spoutTask;
+        if (task != null) {
+            if (curr.val == 0) {
+                pending.remove(id);
+                List values = Utils.mkList(id);
+                collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
+            } else {
+                if (curr.failed) {
+                    pending.remove(id);
+                    List values = Utils.mkList(id);
+                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
+                }
+            }
+        } else {
+
+        }
+
+        collector.ack(input);
+    }
+
+    @Override
+    public void cleanup() {
+
+    }
+}
\ No newline at end of file


[05/13] storm git commit: update according to review comments

Posted by lo...@apache.org.
update according to review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a7d0289c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a7d0289c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a7d0289c

Branch: refs/heads/master
Commit: a7d0289ca52f2a9f2d84ce1bf9ebc84d3c7dcd87
Parents: c4dfa33
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Thu Feb 4 19:47:35 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Thu Feb 4 19:47:35 2016 +0800

----------------------------------------------------------------------
 .../jvm/org/apache/storm/daemon/AckerBolt.java  |  4 +--
 .../src/jvm/org/apache/storm/utils/Utils.java   | 32 +++-----------------
 2 files changed, 6 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a7d0289c/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index a4f6815..763b9a0 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -107,12 +107,12 @@ public class AckerBolt implements IBolt {
         if (task != null) {
             if (curr.val == 0) {
                 pending.remove(id);
-                List values = Utils.mkList(id);
+                List values = Utils.makeList(id);
                 collector.emitDirect(task, ACKER_ACK_STREAM_ID, values);
             } else {
                 if (curr.failed) {
                     pending.remove(id);
-                    List values = Utils.mkList(id);
+                    List values = Utils.makeList(id);
                     collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
                 }
             }

http://git-wip-us.apache.org/repos/asf/storm/blob/a7d0289c/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index e3813f8..9ca2ece 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1375,35 +1375,11 @@ public class Utils {
         }
     }
 
-    public static <T> long bitXorValsSets(java.util.Set<T> vals) {
-        long rtn = 0l;
-        for (T n : vals) {
-            rtn = bitXor(rtn, n);
-        }
-        return rtn;
-    }
-
     public static long bitXor(Object a, Object b) {
-        long rtn;
-
-        if (a instanceof Long && b instanceof Long) {
-            rtn = ((Long) a) ^ ((Long) b);
-            return rtn;
-        } else if (b instanceof Set) {
-            long bs = bitXorValsSets((Set) b);
-            return bitXor(a, bs);
-        } else if (a instanceof Set) {
-            long as = bitXorValsSets((Set) a);
-            return bitXor(as, b);
-        } else {
-            long ai = Long.parseLong(String.valueOf(a));
-            long bi = Long.parseLong(String.valueOf(b));
-            rtn = ai ^ bi;
-            return rtn;
-        }
+        return  ((Long) a) ^ ((Long) b);
     }
 
-    public static <V> List<V> mkList(V... args) {
+    public static <V> List<V> makeList(V... args) {
         ArrayList<V> rtn = new ArrayList<V>();
         for (V o : args) {
             rtn.add(o);
@@ -1411,7 +1387,7 @@ public class Utils {
         return rtn;
     }
 
-    public static <V> List<V> mkList(java.util.Set<V> args) {
+    public static <V> List<V> makeList(java.util.Set<V> args) {
         ArrayList<V> rtn = new ArrayList<V>();
         if (args != null) {
             for (V o : args) {
@@ -1421,7 +1397,7 @@ public class Utils {
         return rtn;
     }
 
-    public static <V> List<V> mkList(Collection<V> args) {
+    public static <V> List<V> makeList(Collection<V> args) {
         ArrayList<V> rtn = new ArrayList<V>();
         if (args != null) {
             for (V o : args) {


[13/13] storm git commit: Send failure response to spout instead of doing nothing, for the case that acker receives FAIL before INIT

Posted by lo...@apache.org.
Send failure response to spout instead of doing nothing, for the case that acker receives FAIL before INIT


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c8138dc7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c8138dc7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c8138dc7

Branch: refs/heads/master
Commit: c8138dc72d4f39fe1ebb09c42931d6cbd3198c7e
Parents: 6e433c8
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 28 21:50:09 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 28 21:50:09 2016 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/daemon/Acker.java | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c8138dc7/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
index 98f73df..7d05e24 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -94,18 +94,19 @@ public class Acker implements IBolt {
                 pending.put(id, curr);
             }
         } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            // For the case that ack_fail message arrives before ack_init
             if (curr == null) {
-                // The tuple has been already timeout or failed. So, do nothing
-                return;
+                curr = new AckObject();
             }
             curr.failed = true;
+            pending.put(id, curr);
         } else {
             LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
             return;
         }
 
         Integer task = curr.spoutTask;
-        if (task != null) {
+        if (curr != null && task != null) {
             if (curr.val == 0) {
                 pending.remove(id);
                 collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));


[07/13] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6c0f4f01
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6c0f4f01
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6c0f4f01

Branch: refs/heads/master
Commit: 6c0f4f01aaef6c7a728db45c6d1b3726f2da1ab9
Parents: 7ce9a3c 12ceb09
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 13:47:48 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 13:47:48 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  17 +
 README.markdown                                 |   2 +
 bin/storm-config.cmd                            |   6 +-
 bin/storm.cmd                                   |  47 +-
 bin/storm.py                                    |   8 +-
 conf/defaults.yaml                              |   4 +
 dev-tools/travis/travis-script.sh               |   4 +-
 .../starter/trident/TridentMapExample.java      | 123 +++
 external/sql/storm-sql-core/pom.xml             |   9 +
 external/storm-elasticsearch/pom.xml            |   2 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 external/storm-hdfs/README.md                   |  15 +-
 external/storm-hdfs/pom.xml                     |  22 +
 .../storm/hdfs/avro/AbstractAvroSerializer.java |  80 ++
 .../storm/hdfs/avro/AvroSchemaRegistry.java     |  28 +
 .../org/apache/storm/hdfs/avro/AvroUtils.java   |  44 +
 .../hdfs/avro/ConfluentAvroSerializer.java      |  83 ++
 .../storm/hdfs/avro/FixedAvroSerializer.java    |  67 ++
 .../storm/hdfs/avro/GenericAvroSerializer.java  |  36 +
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |   4 -
 .../hdfs/avro/TestFixedAvroSerializer.java      |  76 ++
 .../hdfs/avro/TestGenericAvroSerializer.java    |  68 ++
 .../test/resources/FixedAvroSerializer.config   |   2 +
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |   8 +-
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |  44 +-
 .../apache/storm/kafka/PartitionManager.java    |  42 +-
 .../kafka/trident/TridentKafkaEmitter.java      |  23 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 pom.xml                                         |  12 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 storm-core/src/clj/org/apache/storm/cluster.clj |  27 +-
 .../cluster_state/zookeeper_state_factory.clj   |  14 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/config_value.clj   |  25 -
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  40 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  25 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 532 +++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  70 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 172 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 202 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  78 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../clj/org/apache/storm/messaging/loader.clj   |  34 -
 .../clj/org/apache/storm/messaging/local.clj    |  23 -
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  24 +-
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  89 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  99 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 923 +-----------------
 .../src/clj/org/apache/storm/zookeeper.clj      |   1 -
 storm-core/src/jvm/org/apache/storm/Config.java |  39 +
 .../org/apache/storm/command/ConfigValue.java   |  30 +
 .../storm/daemon/metrics/MetricsUtils.java      | 108 ++
 .../reporters/ConsolePreparableReporter.java    |  76 ++
 .../reporters/CsvPreparableReporter.java        |  80 ++
 .../reporters/JmxPreparableReporter.java        |  70 ++
 .../metrics/reporters/PreparableReporter.java   |  32 +
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/trident/Stream.java    |  87 +-
 .../storm/trident/operation/Consumer.java       |  35 +
 .../trident/operation/FlatMapFunction.java      |  37 +
 .../storm/trident/operation/MapFunction.java    |  36 +
 .../operation/impl/ConsumerExecutor.java        |  38 +
 .../operation/impl/FlatMapFunctionExecutor.java |  43 +
 .../operation/impl/MapFunctionExecutor.java     |  41 +
 .../trident/planner/processor/MapProcessor.java |  87 ++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../utils/StormConnectionStateConverter.java    |  44 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 973 ++++++++++++++++++-
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       | 100 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  20 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 131 ++-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 645 ++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 122 files changed, 4723 insertions(+), 2472 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 9bd4f44,dc05dfc..7e17d40
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@@ -14,13 -14,13 +14,14 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns org.apache.storm.daemon.acker
-   (:import [org.apache.storm.task OutputCollector TopologyContext IBolt])
+   (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
+            [org.apache.storm.utils Utils])
    (:import [org.apache.storm.tuple Tuple Fields])
-   (:import [org.apache.storm.utils RotatingMap MutableObject])
+   (:import [org.apache.storm.utils Container RotatingMap MutableObject])
    (:import [java.util List Map])
 -  (:import [org.apache.storm Constants])
 +  (:import [org.apache.storm Constants]
 +           (org.apache.storm.daemon AckerBolt))
-   (:use [org.apache.storm config util])
+   (:use [org.apache.storm config log])
    (:gen-class
     :init init
     :implements [org.apache.storm.task.IBolt]
@@@ -35,18 -44,61 +36,18 @@@
  (defn mk-acker-bolt []
    (let [output-collector (MutableObject.)
          pending (MutableObject.)]
 -    (reify IBolt
 -      (^void prepare [this ^Map storm-conf ^TopologyContext context ^OutputCollector collector]
 -               (.setObject output-collector collector)
 -               (.setObject pending (RotatingMap. 2))
 -               )
 -      (^void execute [this ^Tuple tuple]
 -             (let [^RotatingMap pending (.getObject pending)
 -                   stream-id (.getSourceStreamId tuple)]
 -               (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
 -                 (.rotate pending)
 -                 (let [id (.getValue tuple 0)
 -                       ^OutputCollector output-collector (.getObject output-collector)
 -                       curr (.get pending id)
 -                       curr (condp = stream-id
 -                                ACKER-INIT-STREAM-ID (-> curr
 -                                                         (update-ack (.getValue tuple 1))
 -                                                         (assoc :spout-task (.getValue tuple 2)))
 -                                ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
 -                                ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
 -                   (.put pending id curr)
 -                   (when (and curr (:spout-task curr))
 -                     (cond (= 0 (:val curr))
 -                           (do
 -                             (.remove pending id)
 -                             (acker-emit-direct output-collector
 -                                                (:spout-task curr)
 -                                                ACKER-ACK-STREAM-ID
 -                                                [id]
 -                                                ))
 -                           (:failed curr)
 -                           (do
 -                             (.remove pending id)
 -                             (acker-emit-direct output-collector
 -                                                (:spout-task curr)
 -                                                ACKER-FAIL-STREAM-ID
 -                                                [id]
 -                                                ))
 -                           ))
 -                   (.ack output-collector tuple)
 -                   ))))
 -      (^void cleanup [this]
 -        )
 -      )))
 +    (AckerBolt.)))
  
  (defn -init []
-   [[] (container)])
+   [[] (Container.)])
  
- (defn -prepare [this conf context collector]
+ (defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
    (let [^IBolt ret (mk-acker-bolt)]
-     (container-set! (.state ^org.apache.storm.daemon.acker this) ret)
+     (.. this state (set ret))
 -    (.prepare ret conf context collector)
 -    ))
 +    (.prepare ret conf context collector)))
  
- (defn -execute [this tuple]
-   (let [^IBolt delegate (container-get (.state ^org.apache.storm.daemon.acker this))]
+ (defn -execute [^org.apache.storm.daemon.acker this tuple]
+   (let [^IBolt delegate (.. this state (get))]
      (.execute delegate tuple)
      ))
  

http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6ecc918,eb1ec1e..42fa1fa
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@@ -24,14 -26,23 +26,23 @@@
    (:import [org.apache.storm.metric SystemBolt])
    (:import [org.apache.storm.metric EventLoggerBolt])
    (:import [org.apache.storm.security.auth IAuthorizer])
-   (:import [java.io InterruptedIOException])
+   (:import [java.io InterruptedIOException]
+            [org.json.simple JSONValue])
 -  (:require [clojure.set :as set])  
 +  (:require [clojure.set :as set])
    (:require [org.apache.storm.daemon.acker :as acker])
    (:require [org.apache.storm.thrift :as thrift])
-   (:require [metrics.reporters.jmx :as jmx]))
+   (:require [metrics.core  :refer [default-registry]]))
+ 
+ (defn start-metrics-reporter [reporter conf]
+   (doto reporter
+     (.prepare default-registry conf)
+     (.start))
+   (log-message "Started statistics report plugin..."))
+ 
+ (defn start-metrics-reporters [conf]
+   (doseq [reporter (MetricsUtils/getPreparableReporters conf)]
+     (start-metrics-reporter reporter conf)))
  
- (defn start-metrics-reporters []
-   (jmx/start (jmx/reporter {})))
  
  (def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
  (def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
@@@ -134,9 -144,10 +144,10 @@@
  
  (defn component-conf [component]
    (->> component
 -       .get_common
 -       .get_json_conf
 +      .get_common
 +      .get_json_conf
-       from-json))
+        (#(if % (JSONValue/parse %)))
+        clojurify-structure))
  
  (defn validate-basic! [^StormTopology topology]
    (validate-ids! topology)

http://git-wip-us.apache.org/repos/asf/storm/blob/6c0f4f01/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 2361987,a0c0b1a..44fb1a1
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@@ -441,7 -509,11 +509,11 @@@ public class Utils 
          HashMap nconf = new HashMap(conf);
          // only enable cleanup of blobstore on nimbus
          nconf.put(Config.BLOBSTORE_CLEANUP_ENABLE, Boolean.TRUE);
+ 
+         if(store != null) {
+             // store can be null during testing when mocking utils.
 -            store.prepare(nconf, baseDir, nimbusInfo);
 +        store.prepare(nconf, baseDir, nimbusInfo);
+         }
          return store;
      }
  
@@@ -878,34 -945,18 +945,18 @@@
              } else {
                  inputStream = new BufferedInputStream(new FileInputStream(inFile));
              }
-             tis = new TarArchiveInputStream(inputStream);
+             try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
 -                for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
 -                    unpackEntries(tis, entry, untarDir);
 -                    entry = tis.getNextTarEntry();
 -                }
 +            for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
 +                unpackEntries(tis, entry, untarDir);
 +                entry = tis.getNextTarEntry();
 +            }
+             }
          } finally {
-             cleanup(tis, inputStream);
+             if(inputStream != null) {
+                 inputStream.close();
 -            }
          }
      }
- 
-     /**
-      * Close the Closeable objects and <b>ignore</b> any {@link IOException} or
-      * null pointers. Must only be used for cleanup in exception handlers.
-      *
-      * @param closeables the objects to close
-      */
-     private static void cleanup(java.io.Closeable... closeables) {
-         for (java.io.Closeable c : closeables) {
-             if (c != null) {
-                 try {
-                     c.close();
-                 } catch (IOException e) {
-                     LOG.debug("Exception in closing " + c, e);
- 
 +                }
-             }
-         }
-     }
  
      private static void unpackEntries(TarArchiveInputStream tis,
                                        TarArchiveEntry entry, File outputDir) throws IOException {
@@@ -1186,7 -1258,12 +1258,12 @@@
          return dump.toString();
      }
  
-     // Assumes caller is synchronizing
 -    /**
++    /*
+      * Creates an instance of the pluggable SerializationDelegate or falls back to
+      * DefaultSerializationDelegate if something goes wrong.
+      * @param stormConf The config from which to pull the name of the pluggable class.
+      * @return an instance of the class specified by storm.meta.serialization.delegate
+      */
      private static SerializationDelegate getSerializationDelegate(Map stormConf) {
          String delegateClassName = (String)stormConf.get(Config.STORM_META_SERIALIZATION_DELEGATE);
          SerializationDelegate delegate;
@@@ -1375,8 -1446,806 +1446,810 @@@
          }
      }
  
+     /**
+      * Determines if a zip archive contains a particular directory.
+      *
+      * @param zipfile path to the zipped file
+      * @param target directory being looked for in the zip.
+      * @return boolean whether or not the directory exists in the zip.
+      */
+     public static boolean zipDoesContainDir(String zipfile, String target) throws IOException {
+         List<ZipEntry> entries = (List<ZipEntry>)Collections.list(new ZipFile(zipfile).entries());
+ 
+         String targetDir = target + "/";
+         for(ZipEntry entry : entries) {
+             String name = entry.getName();
+             if(name.startsWith(targetDir)) {
+                 return true;
+             }
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * Joins any number of maps together into a single map, combining their values into
+      * a list, maintaining values in the order the maps were passed in. Nulls are inserted
+      * for given keys when the map does not contain that key.
+      *
+      * i.e. joinMaps({'a' => 1, 'b' => 2}, {'b' => 3}, {'a' => 4, 'c' => 5}) ->
+      *      {'a' => [1, null, 4], 'b' => [2, 3, null], 'c' => [null, null, 5]}
+      *
+      * @param maps variable number of maps to join - order affects order of values in output.
+      * @return combined map
+      */
+     public static <K, V> Map<K, List<V>> joinMaps(Map<K, V>... maps) {
+         Map<K, List<V>> ret = new HashMap<>();
+ 
+         Set<K> keys = new HashSet<>();
+ 
+         for(Map<K, V> map : maps) {
+             keys.addAll(map.keySet());
+         }
+ 
+         for(Map<K, V> m : maps) {
+             for(K key : keys) {
+                 V value = m.get(key);
+ 
+                 if(!ret.containsKey(key)) {
+                     ret.put(key, new ArrayList<V>());
+                 }
+ 
+                 List<V> targetList = ret.get(key);
+                 targetList.add(value);
+             }
+         }
+         return ret;
+     }
+ 
+     /**
+      * Fills up chunks out of a collection (given a maximum amount of chunks)
+      *
+      * i.e. partitionFixed(5, [1,2,3]) -> [[1,2,3]]
+      *      partitionFixed(5, [1..9]) -> [[1,2], [3,4], [5,6], [7,8], [9]]
+      *      partitionFixed(3, [1..10]) -> [[1,2,3,4], [5,6,7], [8,9,10]]
+      * @param maxNumChunks the maximum number of chunks to return
+      * @param coll the collection to be chunked up
+      * @return a list of the chunks, which are themselves lists.
+      */
+     public static <T> List<List<T>> partitionFixed(int maxNumChunks, Collection<T> coll) {
+         List<List<T>> ret = new ArrayList<>();
+ 
+         if(maxNumChunks == 0 || coll == null) {
+             return ret;
+         }
+ 
+         Map<Integer, Integer> parts = integerDivided(coll.size(), maxNumChunks);
+ 
+         // Keys sorted in descending order
+         List<Integer> sortedKeys = new ArrayList<Integer>(parts.keySet());
+         Collections.sort(sortedKeys, Collections.reverseOrder());
+ 
+ 
+         Iterator<T> it = coll.iterator();
+         for(Integer chunkSize : sortedKeys) {
+             if(!it.hasNext()) { break; }
+             Integer times = parts.get(chunkSize);
+             for(int i = 0; i < times; i++) {
+                 if(!it.hasNext()) { break; }
+                 List<T> chunkList = new ArrayList<>();
+                 for(int j = 0; j < chunkSize; j++) {
+                     if(!it.hasNext()) { break; }
+                     chunkList.add(it.next());
+                 }
+                 ret.add(chunkList);
+             }
+         }
+ 
+         return ret;
+     }
+ 
+     /**
+      * Return a new instance of a pluggable specified in the conf.
+      * @param conf The conf to read from.
+      * @param configKey The key pointing to the pluggable class
+      * @return an instance of the class or null if it is not specified.
+      */
+     public static Object getConfiguredClass(Map conf, Object configKey) {
+         if (conf.containsKey(configKey)) {
+             return newInstance((String)conf.get(configKey));
+         }
+         return null;
+     }
+ 
+     public static String logsFilename(String stormId, String port) {
+         return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "worker.log";
+     }
+ 
+     public static String eventLogsFilename(String stormId, String port) {
+         return stormId + FILE_PATH_SEPARATOR + port + FILE_PATH_SEPARATOR + "events.log";
+     }
+ 
+     public static Object readYamlFile(String yamlFile) {
+         try (FileReader reader = new FileReader(yamlFile)) {
+             return new Yaml(new SafeConstructor()).load(reader);
+         } catch(Exception ex) {
+             LOG.error("Failed to read yaml file.", ex);
+         }
+         return null;
+     }
+ 
+     public static void setupDefaultUncaughtExceptionHandler() {
+         Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                 public void uncaughtException(Thread thread, Throwable thrown) {
+                     try {
+                         handleUncaughtException(thrown);
+                     } catch (Error err) {
+                         LOG.error("Received error in main thread.. terminating server...", err);
+                         Runtime.getRuntime().exit(-2);
+                     }
+                 }
+             });
+     }
+ 
+     /**
+      * Creates a new map with a string value in the map replaced with an
+      * equivalently-lengthed string of '#'.
+      * @param m The map that a value will be redacted from
+      * @param key The key pointing to the value to be redacted
+      * @return a new map with the value redacted. The original map will not be modified.
+      */
+     public static Map<Object, String> redactValue(Map<Object, String> m, Object key) {
+         if(m.containsKey(key)) {
+             HashMap<Object, String> newMap = new HashMap<>(m);
+             String value = newMap.get(key);
+             String redacted = new String(new char[value.length()]).replace("\0", "#");
+             newMap.put(key, redacted);
+             return newMap;
+         }
+         return m;
+     }
+ 
+     /**
+      * Make sure a given key name is valid for the storm config.
+      * Throw RuntimeException if the key isn't valid.
+      * @param name The name of the config key to check.
+      */
+     private static final Set<String> disallowedKeys = new HashSet<>(Arrays.asList(new String[] {"/", ".", ":", "\\"}));
+     public static void validateKeyName(String name) {
+ 
+         for(String key : disallowedKeys) {
+             if( name.contains(key) ) {
+                 throw new RuntimeException("Key name cannot contain any of the following: " + disallowedKeys.toString());
+             }
+         }
+         if(name.trim().isEmpty()) {
+             throw new RuntimeException("Key name cannot be blank");
+         }
+     }
+ 
+     /**
+      * Find the first item of coll for which pred.test(...) returns true.
+      * @param pred The IPredicate to test for
+      * @param coll The Collection of items to search through.
+      * @return The first matching value in coll, or null if nothing matches.
+      */
+     public static <T> T findOne (IPredicate<T> pred, Collection<T> coll) {
+         if(coll == null) {
+             return null;
+         }
+         for(T elem : coll) {
+             if (pred.test(elem)) {
+                 return elem;
+             }
+         }
+         return null;
+     }
+ 
+     public static <T, U> T findOne (IPredicate<T> pred, Map<U, T> map) {
+         if(map == null) {
+             return null;
+         }
+         return findOne(pred, (Set<T>)map.entrySet());
+     }
+ 
+     public static String localHostname () throws UnknownHostException {
+         return _instance.localHostnameImpl();
+     }
+ 
+     // Non-static impl methods exist for mocking purposes.
+     protected String localHostnameImpl () throws UnknownHostException {
+         return InetAddress.getLocalHost().getCanonicalHostName();
+     }
+ 
+     private static String memoizedLocalHostnameString = null;
+ 
+     public static String memoizedLocalHostname () throws UnknownHostException {
+         if (memoizedLocalHostnameString == null) {
+             memoizedLocalHostnameString = localHostname();
+         }
+         return memoizedLocalHostnameString;
+     }
+ 
+     /**
+      * Gets the storm.local.hostname value, or tries to figure out the local hostname
+      * if it is not set in the config.
+      * @param conf The storm config to read from
+      * @return a string representation of the hostname.
+     */
+     public static String hostname (Map<String, Object> conf) throws UnknownHostException  {
+         if (conf == null) {
+             return memoizedLocalHostname();
+         }
+         Object hostnameString = conf.get(Config.STORM_LOCAL_HOSTNAME);
+         if (hostnameString == null || hostnameString.equals("")) {
+             return memoizedLocalHostname();
+         }
+         return (String)hostnameString;
+     }
+ 
+     public static String uuid() {
+         return UUID.randomUUID().toString();
+     }
+ 
+     public static void exitProcess (int val, Object... msg) {
+         StringBuilder errorMessage = new StringBuilder();
+         errorMessage.append("Halting process: ");
+         for (Object oneMessage: msg) {
+             errorMessage.append(oneMessage);
+         }
+         String combinedErrorMessage = errorMessage.toString();
+         LOG.error(combinedErrorMessage, new RuntimeException(combinedErrorMessage));
+         Runtime.getRuntime().exit(val);
+     }
+ 
+     /**
+      * "{:a 1 :b 1 :c 2} -> {1 [:a :b] 2 :c}"
+      *
+      * Example usage in java:
+      *  Map<Integer, String> tasks;
+      *  Map<String, List<Integer>> componentTasks = Utils.reverse_map(tasks);
+      *
+      * The order of he resulting list values depends on the ordering properties
+      * of the Map passed in. The caller is responsible for passing an ordered
+      * map if they expect the result to be consistently ordered as well.
+      *
+      * @param map to reverse
+      * @return a reversed map
+      */
+     public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
+         HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
+         if (map == null) {
+             return rtn;
+         }
+         for (Entry<K, V> entry : map.entrySet()) {
+             K key = entry.getKey();
+             V val = entry.getValue();
+             List<K> list = rtn.get(val);
+             if (list == null) {
+                 list = new ArrayList<K>();
+                 rtn.put(entry.getValue(), list);
+             }
+             list.add(key);
+         }
+         return rtn;
+     }
+ 
+     /**
+      * "[[:a 1] [:b 1] [:c 2]} -> {1 [:a :b] 2 :c}"
+      * Reverses an assoc-list style Map like reverseMap(Map...)
+      *
+      * @param listSeq to reverse
+      * @return a reversed map
+      */
+     public static HashMap reverseMap(List listSeq) {
+         HashMap<Object, List<Object>> rtn = new HashMap();
+         if (listSeq == null) {
+             return rtn;
+         }
+         for (Object entry : listSeq) {
+             List listEntry = (List) entry;
+             Object key = listEntry.get(0);
+             Object val = listEntry.get(1);
+             List list = rtn.get(val);
+             if (list == null) {
+                 list = new ArrayList<Object>();
+                 rtn.put(val, list);
+             }
+             list.add(key);
+         }
+         return rtn;
+     }
+ 
+ 
+     /**
+      * @return the pid of this JVM, because Java doesn't provide a real way to do this.
+      */
+     public static String processPid() {
+         String name = ManagementFactory.getRuntimeMXBean().getName();
+         String[] split = name.split("@");
+         if (split.length != 2) {
+             throw new RuntimeException("Got unexpected process name: " + name);
+         }
+         return split[0];
+     }
+ 
+     public static int execCommand(String... command) throws ExecuteException, IOException {
+         CommandLine cmd = new CommandLine(command[0]);
+         for (int i = 1; i < command.length; i++) {
+             cmd.addArgument(command[i]);
+         }
+ 
+         DefaultExecutor exec = new DefaultExecutor();
+         return exec.execute(cmd);
+     }
+ 
+     /**
+      * Extract dir from the jar to destdir
+      *
+      * @param jarpath Path to the jar file
+      * @param dir Directory in the jar to pull out
+      * @param destdir Path to the directory where the extracted directory will be put
+      *
+      */
+     public static void extractDirFromJar(String jarpath, String dir, String destdir) {
+         try (JarFile jarFile = new JarFile(jarpath)) {
+             Enumeration<JarEntry> jarEnums = jarFile.entries();
+             while (jarEnums.hasMoreElements()) {
+                 JarEntry entry = jarEnums.nextElement();
+                 if (!entry.isDirectory() && entry.getName().startsWith(dir)) {
+                     File aFile = new File(destdir, entry.getName());
+                     aFile.getParentFile().mkdirs();
+                     try (FileOutputStream out = new FileOutputStream(aFile);
+                          InputStream in = jarFile.getInputStream(entry)) {
+                         IOUtils.copy(in, out);
+                     }
+                 }
+             }
+         } catch (IOException e) {
+             LOG.info("Could not extract {} from {}", dir, jarpath);
+         }
+     }
+ 
+     public static void sendSignalToProcess(long lpid, int signum) throws IOException {
+         String pid = Long.toString(lpid);
+         try {
+             if (isOnWindows()) {
+                 if (signum == SIGKILL) {
+                     execCommand("taskkill", "/f", "/pid", pid);
+                 } else {
+                     execCommand("taskkill", "/pid", pid);
+                 }
+             } else {
+                 execCommand("kill", "-" + signum, pid);
+             }
+         } catch (ExecuteException e) {
+             LOG.info("Error when trying to kill {}. Process is probably already dead.", pid);
+         } catch (IOException e) {
+             LOG.info("IOException Error when trying to kill {}.", pid);
+             throw e;
+         }
+     }
+ 
+     public static void forceKillProcess (String pid) throws IOException {
+         sendSignalToProcess(Long.parseLong(pid), SIGKILL);
+     }
+ 
+     public static void killProcessWithSigTerm (String pid) throws IOException {
+         sendSignalToProcess(Long.parseLong(pid), SIGTERM);
+     }
+ 
+     /**
+      * Adds the user supplied function as a shutdown hook for cleanup.
+      * Also adds a function that sleeps for a second and then halts the
+      * runtime to avoid any zombie process in case cleanup function hangs.
+      */
+     public static void addShutdownHookWithForceKillIn1Sec (Runnable func) {
+         Runnable sleepKill = new Runnable() {
+             @Override
+             public void run() {
+                 try {
+                     Time.sleepSecs(1);
+                     Runtime.getRuntime().halt(20);
+                 } catch (Exception e) {
+                     LOG.warn("Exception in the ShutDownHook", e);
+                 }
+             }
+         };
+         Runtime.getRuntime().addShutdownHook(new Thread(func));
+         Runtime.getRuntime().addShutdownHook(new Thread(sleepKill));
+     }
+ 
+     /**
+      * Returns the combined string, escaped for posix shell.
+      * @param command the list of strings to be combined
+      * @return the resulting command string
+      */
+     public static String shellCmd (List<String> command) {
+         List<String> changedCommands = new ArrayList<>(command.size());
+         for (String str: command) {
+             if (str == null) {
+                 continue;
+             }
+             changedCommands.add("'" + str.replaceAll("'", "'\"'\"'") + "'");
+         }
+         return StringUtils.join(changedCommands, " ");
+     }
+ 
+     public static String scriptFilePath (String dir) {
+         return dir + FILE_PATH_SEPARATOR + "storm-worker-script.sh";
+     }
+ 
+     public static String containerFilePath (String dir) {
+         return dir + FILE_PATH_SEPARATOR + "launch_container.sh";
+     }
+ 
+     public static Object nullToZero (Object v) {
+         return (v != null ? v : 0);
+     }
+ 
+     /**
+      * Deletes a file or directory and its contents if it exists. Does not
+      * complain if the input is null or does not exist.
+      * @param path the path to the file or directory
+      */
+     public static void forceDelete(String path) throws IOException {
+         _instance.forceDeleteImpl(path);
+     }
+ 
+     // Non-static impl methods exist for mocking purposes.
+     protected void forceDeleteImpl(String path) throws IOException {
+         LOG.debug("Deleting path {}", path);
+         if (checkFileExists(path)) {
+             try {
+                 FileUtils.forceDelete(new File(path));
+             } catch (FileNotFoundException ignored) {}
+         }
+     }
+ 
+     /**
+      * Creates a symbolic link to the target
+      * @param dir the parent directory of the link
+      * @param targetDir the parent directory of the link's target
+      * @param targetFilename the file name of the links target
+      * @param filename the file name of the link
+      * @throws IOException
+      */
+     public static void createSymlink(String dir, String targetDir,
+             String targetFilename, String filename) throws IOException {
+         Path path = Paths.get(dir, filename).toAbsolutePath();
+         Path target = Paths.get(targetDir, targetFilename).toAbsolutePath();
+         LOG.debug("Creating symlink [{}] to [{}]", path, target);
+         if (!path.toFile().exists()) {
+             Files.createSymbolicLink(path, target);
+         }
+     }
+ 
+     /**
+      * Convenience method for the case when the link's file name should be the
+      * same as the file name of the target
+      */
+     public static void createSymlink(String dir, String targetDir,
+                                      String targetFilename) throws IOException {
+         Utils.createSymlink(dir, targetDir, targetFilename,
+                             targetFilename);
+     }
+ 
+     /**
+      * Returns a Collection of file names found under the given directory.
+      * @param dir a directory
+      * @return the Collection of file names
+      */
+     public static Collection<String> readDirContents(String dir) {
+         Collection<String> ret = new HashSet<>();
+         File[] files = new File(dir).listFiles();
+         if (files != null) {
+             for (File f: files) {
+                 ret.add(f.getName());
+             }
+         }
+         return ret;
+     }
+ 
+     /**
+      * Returns the value of java.class.path System property. Kept separate for
+      * testing.
+      * @return the classpath
+      */
+     public static String currentClasspath() {
+         return _instance.currentClasspathImpl();
+     }
+ 
+     // Non-static impl methods exist for mocking purposes.
+     public String currentClasspathImpl() {
+         return System.getProperty("java.class.path");
+     }
+ 
+     /**
+      * Returns a collection of jar file names found under the given directory.
+      * @param dir the directory to search
+      * @return the jar file names
+      */
+     private static List<String> getFullJars(String dir) {
+         File[] files = new File(dir).listFiles(jarFilter);
+ 
+         if(files == null) {
+             return new ArrayList<>();
+         }
+ 
+         List<String> ret = new ArrayList<>(files.length);
+         for (File f : files) {
+             ret.add(Paths.get(dir, f.getName()).toString());
+         }
+         return ret;
+     }
+     private static final FilenameFilter jarFilter = new FilenameFilter() {
+             @Override
+             public boolean accept(File dir, String name) {
+                 return name.endsWith(".jar");
+             }
+         };
+ 
+ 
+     public static String workerClasspath() {
+         String stormDir = System.getProperty("storm.home");
+ 
+         if (stormDir == null) {
+             return Utils.currentClasspath();
+         }
+ 
+         String stormLibDir = Paths.get(stormDir, "lib").toString();
+         String stormConfDir =
+                 System.getenv("STORM_CONF_DIR") != null ?
+                 System.getenv("STORM_CONF_DIR") :
+                 Paths.get(stormDir, "conf").toString();
+         String stormExtlibDir = Paths.get(stormDir, "extlib").toString();
+         String extcp = System.getenv("STORM_EXT_CLASSPATH");
+         List<String> pathElements = new LinkedList<>();
+         pathElements.addAll(Utils.getFullJars(stormLibDir));
+         pathElements.addAll(Utils.getFullJars(stormExtlibDir));
+         pathElements.add(extcp);
+         pathElements.add(stormConfDir);
+ 
+         return StringUtils.join(pathElements,
+                 CLASS_PATH_SEPARATOR);
+     }
+ 
+     public static String addToClasspath(String classpath,
+                 Collection<String> paths) {
+         return _instance.addToClasspathImpl(classpath, paths);
+     }
+ 
+     // Non-static impl methods exist for mocking purposes.
+     public String addToClasspathImpl(String classpath,
+                 Collection<String> paths) {
+         if (paths == null || paths.isEmpty()) {
+             return classpath;
+         }
+         List<String> l = new LinkedList<>();
+         l.add(classpath);
+         l.addAll(paths);
+         return StringUtils.join(l, CLASS_PATH_SEPARATOR);
+     }
+ 
+     public static class UptimeComputer {
+         int startTime = 0;
+ 
+         public UptimeComputer() {
+             startTime = Time.currentTimeSecs();
+         }
+ 
+         public int upTime() {
+             return Time.deltaSecs(startTime);
+         }
+     }
+ 
+     public static UptimeComputer makeUptimeComputer() {
+         return _instance.makeUptimeComputerImpl();
+     }
+ 
+     // Non-static impl methods exist for mocking purposes.
+     public UptimeComputer makeUptimeComputerImpl() {
+         return new UptimeComputer();
+     }
+ 
+     /**
+      * Writes a posix shell script file to be executed in its own process.
+      * @param dir the directory under which the script is to be written
+      * @param command the command the script is to execute
+      * @param environment optional environment variables to set before running the script's command. May be  null.
+      * @return the path to the script that has been written
+      */
+     public static String writeScript(String dir, List<String> command,
+                                      Map<String,String> environment) throws IOException {
+         String path = Utils.scriptFilePath(dir);
+         try(BufferedWriter out = new BufferedWriter(new FileWriter(path))) {
+             out.write("#!/bin/bash");
+             out.newLine();
+             if (environment != null) {
+                 for (String k : environment.keySet()) {
+                     String v = environment.get(k);
+                     if (v == null) {
+                         v = "";
+                     }
+                     out.write(Utils.shellCmd(
+                             Arrays.asList(
+                                     "export",k+"="+v)));
+                     out.write(";");
+                     out.newLine();
+                 }
+             }
+             out.newLine();
+             out.write("exec "+Utils.shellCmd(command)+";");
+         }
+         return path;
+     }
+ 
+     /**
+      * A thread that can answer if it is sleeping in the case of simulated time.
+      * This class is not useful when simulated time is not being used.
+      */
+     public static class SmartThread extends Thread {
+         public boolean isSleeping() {
+             return Time.isThreadWaiting(this);
+         }
+         public SmartThread(Runnable r) {
+             super(r);
+         }
+     }
+ 
+     /**
+      * Creates a thread that calls the given code repeatedly, sleeping for an
+      * interval of seconds equal to the return value of the previous call.
+      *
+      * The given afn may be a callable that returns the number of seconds to
+      * sleep, or it may be a Callable that returns another Callable that in turn
+      * returns the number of seconds to sleep. In the latter case isFactory.
+      *
+      * @param afn the code to call on each iteration
+      * @param isDaemon whether the new thread should be a daemon thread
+      * @param eh code to call when afn throws an exception
+      * @param priority the new thread's priority
+      * @param isFactory whether afn returns a callable instead of sleep seconds
+      * @param startImmediately whether to start the thread before returning
+      * @param threadName a suffix to be appended to the thread name
+      * @return the newly created thread
+      * @see java.lang.Thread
+      */
+     public static SmartThread asyncLoop(final Callable afn,
+             boolean isDaemon, final Thread.UncaughtExceptionHandler eh,
+             int priority, final boolean isFactory, boolean startImmediately,
+             String threadName) {
+         SmartThread thread = new SmartThread(new Runnable() {
+             public void run() {
+                 Object s;
+                 try {
+                     Callable fn = isFactory ? (Callable) afn.call() : afn;
+                     while ((s = fn.call()) instanceof Long) {
+                         Time.sleepSecs((Long) s);
+                     }
+                 } catch (Throwable t) {
+                     if (Utils.exceptionCauseIsInstanceOf(
+                             InterruptedException.class, t)) {
+                         LOG.info("Async loop interrupted!");
+                         return;
+                     }
+                     LOG.error("Async loop died!", t);
+                     throw new RuntimeException(t);
+                 }
+             }
+         });
+         if (eh != null) {
+             thread.setUncaughtExceptionHandler(eh);
+         } else {
+             thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+                 public void uncaughtException(Thread t, Throwable e) {
+                     LOG.error("Async loop died!", e);
+                     Utils.exitProcess(1, "Async loop died!");
+                 }
+             });
+         }
+         thread.setDaemon(isDaemon);
+         thread.setPriority(priority);
+         if (threadName != null && !threadName.isEmpty()) {
+             thread.setName(thread.getName() +"-"+ threadName);
+         }
+         if (startImmediately) {
+             thread.start();
+         }
+         return thread;
+     }
+ 
+     /**
+      * Convenience method used when only the function and name suffix are given.
+      * @param afn the code to call on each iteration
+      * @param threadName a suffix to be appended to the thread name
+      * @return the newly created thread
+      * @see java.lang.Thread
+      */
+     public static SmartThread asyncLoop(final Callable afn, String threadName, final Thread.UncaughtExceptionHandler eh) {
+         return asyncLoop(afn, false, eh, Thread.NORM_PRIORITY, false, true,
+                 threadName);
+     }
+ 
+     /**
+      * Convenience method used when only the function is given.
+      * @param afn the code to call on each iteration
+      * @return the newly created thread
+      */
+     public static SmartThread asyncLoop(final Callable afn) {
+         return asyncLoop(afn, false, null, Thread.NORM_PRIORITY, false, true,
+                 null);
+     }
+ 
+     /**
+      * A callback that can accept an integer.
+      * @param <V> the result type of method <code>call</code>
+      */
+     public interface ExitCodeCallable<V> extends Callable<V> {
+         V call(int exitCode);
+     }
+ 
+     /**
+      * Launch a new process as per {@link java.lang.ProcessBuilder} with a given
+      * callback.
+      * @param command the command to be executed in the new process
+      * @param environment the environment to be applied to the process. Can be
+      *                    null.
+      * @param logPrefix a prefix for log entries from the output of the process.
+      *                  Can be null.
+      * @param exitCodeCallback code to be called passing the exit code value
+      *                         when the process completes
+      * @param dir the working directory of the new process
+      * @return the new process
+      * @throws IOException
+      * @see java.lang.ProcessBuilder
+      */
+     public static Process launchProcess(List<String> command,
+                                         Map<String,String> environment,
+                                         final String logPrefix,
+                                         final ExitCodeCallable exitCodeCallback,
+                                         File dir)
+             throws IOException {
+         return _instance.launchProcessImpl(command, environment, logPrefix,
+                 exitCodeCallback, dir);
+     }
+ 
+     public Process launchProcessImpl(
+             List<String> command,
+             Map<String,String> cmdEnv,
+             final String logPrefix,
+             final ExitCodeCallable exitCodeCallback,
+             File dir)
+             throws IOException {
+         ProcessBuilder builder = new ProcessBuilder(command);
+         Map<String,String> procEnv = builder.environment();
+         if (dir != null) {
+             builder.directory(dir);
+         }
+         builder.redirectErrorStream(true);
+         if (cmdEnv != null) {
+             procEnv.putAll(cmdEnv);
+         }
+         final Process process = builder.start();
+         if (logPrefix != null || exitCodeCallback != null) {
+             Utils.asyncLoop(new Callable() {
+                 public Object call() {
+                     if (logPrefix != null ) {
+                         Utils.readAndLogStream(logPrefix,
+                                 process.getInputStream());
+                     }
+                     if (exitCodeCallback != null) {
+                         try {
+                             process.waitFor();
+                         } catch (InterruptedException ie) {
+                             LOG.info("{} interrupted", logPrefix);
+                             exitCodeCallback.call(process.exitValue());
+                         }
+                     }
+                     return null; // Run only once.
+                 }
+             });
+         }
+         return process;
+     }
++
 +    public static long bitXor(Object a, Object b) {
-         return  ((Long) a) ^ ((Long) b);
++        return ((Long) a) ^ ((Long) b);
 +    }
  }
- 


[03/13] storm git commit: [STORM-1245] port backtype.storm.daemon.acker to java

Posted by lo...@apache.org.
[STORM-1245] port backtype.storm.daemon.acker to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e50a312f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e50a312f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e50a312f

Branch: refs/heads/master
Commit: e50a312f1440131e8b9e0cc055d475cbbe711cb9
Parents: 675b0c4
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 16:01:43 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 16:01:43 2016 +0800

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/acker.clj | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e50a312f/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 9902b35..39e6f55 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -20,7 +20,7 @@
   (:import [java.util List Map])
   (:import [org.apache.storm Constants]
            (org.apache.storm.daemon AckerBolt))
-  (:use [org.apache.storm config util log])
+  (:use [org.apache.storm config util])
   (:gen-class
     :init init
     :implements [org.apache.storm.task.IBolt]
@@ -35,7 +35,6 @@
 (defn mk-acker-bolt []
   (let [output-collector (MutableObject.)
         pending (MutableObject.)]
-    (log-message "Symbol AckerBolt"  (symbol "AckerBolt") )
     (AckerBolt.)))
 
 (defn -init []


[11/13] storm git commit: restore indent change

Posted by lo...@apache.org.
restore indent change


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/14b993a8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/14b993a8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/14b993a8

Branch: refs/heads/master
Commit: 14b993a882cf8d416f144a5a3cf4cb8d87a8811d
Parents: 9603e30
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 22:21:20 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 22:21:20 2016 +0800

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/testing.clj | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/14b993a8/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index f04befc..804278c 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -691,8 +691,8 @@
                 (increment-global! id# "transferred" 1)
                 (apply transferrer# args2#)))))]
        (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
-                                          (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
-                                            ~@body)))
+                           (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+                             ~@body)))
      (RegisteredGlobalState/clearState id#)))
 
 (defn tracked-wait


[10/13] storm git commit: update according to review comments

Posted by lo...@apache.org.
update according to review comments


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9603e309
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9603e309
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9603e309

Branch: refs/heads/master
Commit: 9603e30961dbb2391bf309031f64c29f808277f7
Parents: f54f2cf
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 22:08:43 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 22:08:43 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/acker.clj   |  58 ---------
 .../src/clj/org/apache/storm/daemon/common.clj  |  17 ++-
 storm-core/src/clj/org/apache/storm/testing.clj |  11 +-
 .../src/jvm/org/apache/storm/daemon/Acker.java  | 127 +++++++++++++++++++
 .../jvm/org/apache/storm/daemon/AckerBolt.java  | 127 -------------------
 .../src/jvm/org/apache/storm/utils/Utils.java   |   4 +-
 6 files changed, 144 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
deleted file mode 100644
index 9aa15ae..0000000
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ /dev/null
@@ -1,58 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-(ns org.apache.storm.daemon.acker
-  (:import [org.apache.storm.task OutputCollector TopologyContext IBolt]
-           [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.tuple Tuple Fields])
-  (:import [org.apache.storm.utils Container RotatingMap MutableObject])
-  (:import [java.util List Map])
-  (:import [org.apache.storm Constants]
-           (org.apache.storm.daemon AckerBolt))
-  (:use [org.apache.storm config log])
-  (:gen-class
-   :init init
-   :implements [org.apache.storm.task.IBolt]
-   :constructors {[] []}
-   :state state))
-
-(def ACKER-COMPONENT-ID AckerBolt/ACKER_COMPONENT_ID)
-(def ACKER-INIT-STREAM-ID AckerBolt/ACKER_INIT_STREAM_ID)
-(def ACKER-ACK-STREAM-ID AckerBolt/ACKER_ACK_STREAM_ID)
-(def ACKER-FAIL-STREAM-ID AckerBolt/ACKER_FAIL_STREAM_ID)
-
-(defn mk-acker-bolt []
-  (let [output-collector (MutableObject.)
-        pending (MutableObject.)]
-    (AckerBolt.)))
-
-(defn -init []
-  [[] (Container.)])
-
-(defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
-  (let [^IBolt ret (mk-acker-bolt)]
-    (.. this state (set ret))
-    (.prepare ret conf context collector)
-    ))
-
-(defn -execute [^org.apache.storm.daemon.acker this tuple]
-  (let [^IBolt delegate (.. this state (get))]
-    (.execute delegate tuple)
-    ))
-
-(defn -cleanup [^org.apache.storm.daemon.acker this]
-  (let [^IBolt delegate (.. this state (get))]
-    (.cleanup delegate)
-    ))

http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index db7fd40..4076e38 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -29,9 +29,9 @@
   (:import [java.io InterruptedIOException]
            [org.json.simple JSONValue])
   (:import [java.util HashMap])
-  (:import [org.apache.storm Thrift])
+  (:import [org.apache.storm Thrift]
+           (org.apache.storm.daemon Acker))
   (:require [clojure.set :as set])
-  (:require [org.apache.storm.daemon.acker :as acker])
   (:require [metrics.reporters.jmx :as jmx])
   (:require [metrics.core  :refer [default-registry]]))
 
@@ -46,10 +46,10 @@
     (start-metrics-reporter reporter conf)))
 
 
-(def ACKER-COMPONENT-ID acker/ACKER-COMPONENT-ID)
-(def ACKER-INIT-STREAM-ID acker/ACKER-INIT-STREAM-ID)
-(def ACKER-ACK-STREAM-ID acker/ACKER-ACK-STREAM-ID)
-(def ACKER-FAIL-STREAM-ID acker/ACKER-FAIL-STREAM-ID)
+(def ACKER-COMPONENT-ID Acker/ACKER_COMPONENT_ID)
+(def ACKER-INIT-STREAM-ID Acker/ACKER_INIT_STREAM_ID)
+(def ACKER-ACK-STREAM-ID Acker/ACKER_ACK_STREAM_ID)
+(def ACKER-FAIL-STREAM-ID Acker/ACKER_FAIL_STREAM_ID)
 
 (def SYSTEM-STREAM-ID "__system")
 
@@ -222,10 +222,13 @@
                         ))]
     (merge spout-inputs bolt-inputs)))
 
+(defn mk-acker-bolt []
+  (Acker.))
+
 (defn add-acker! [storm-conf ^StormTopology ret]
   (let [num-executors (if (nil? (storm-conf TOPOLOGY-ACKER-EXECUTORS)) (storm-conf TOPOLOGY-WORKERS) (storm-conf TOPOLOGY-ACKER-EXECUTORS))
         acker-bolt (Thrift/prepareSerializedBoltDetails (acker-inputs ret)
-                                                        (new org.apache.storm.daemon.acker)
+                                                        (mk-acker-bolt)
                                                         {ACKER-ACK-STREAM-ID (Thrift/directOutputFields ["id"])
                                                          ACKER-FAIL-STREAM-ID (Thrift/directOutputFields ["id"])
                                                         }

http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/testing.clj b/storm-core/src/clj/org/apache/storm/testing.clj
index 7817929..f04befc 100644
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@ -50,7 +50,6 @@
            (org.apache.storm.messaging IContext)
            [org.json.simple JSONValue])
   (:require [org.apache.storm [zookeeper :as zk]])
-  (:require [org.apache.storm.daemon.acker :as acker])
   (:use [org.apache.storm cluster util config log local-state-converter])
   (:use [org.apache.storm.internal thrift]))
 
@@ -675,9 +674,9 @@
          (.put "transferred" (AtomicInteger. 0))
          (.put "processed" (AtomicInteger. 0))))
      (with-var-roots
-       [acker/mk-acker-bolt
-        (let [old# acker/mk-acker-bolt]
-          (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
+       [common/mk-acker-bolt
+        (let [old# common/mk-acker-bolt]
+         (fn [& args#] (NonRichBoltTracker. (apply old# args#) id#)))
         ;; critical that this particular function is overridden here,
         ;; since the transferred stat needs to be incremented at the moment
         ;; of tuple emission (and not on a separate thread later) for
@@ -692,8 +691,8 @@
                 (increment-global! id# "transferred" 1)
                 (apply transferrer# args2#)))))]
        (with-simulated-time-local-cluster [~cluster-sym ~@cluster-args]
-                           (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
-                             ~@body)))
+                                          (let [~cluster-sym (assoc-track-id ~cluster-sym id#)]
+                                            ~@body)))
      (RegisteredGlobalState/clearState id#)))
 
 (defn tracked-wait

http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/Acker.java b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
new file mode 100644
index 0000000..98f73df
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/daemon/Acker.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.daemon;
+
+import org.apache.storm.task.IBolt;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.RotatingMap;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public class Acker implements IBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(Acker.class);
+
+    private static final long serialVersionUID = 4430906880683183091L;
+
+    public static final String ACKER_COMPONENT_ID = "__acker";
+    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
+    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
+    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
+
+    public static final int TIMEOUT_BUCKET_NUM = 3;
+
+    private OutputCollector collector;
+    private RotatingMap<Object, AckObject> pending;
+
+    private class AckObject {
+        public long val = 0L;
+        public Integer spoutTask = null;
+        public boolean failed = false;
+
+        // val xor value
+        public void updateAck(Long value) {
+            val = Utils.bitXor(val, value);
+        }
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        if (TupleUtils.isTick(input)) {
+            Map<Object, AckObject> tmp = pending.rotate();
+            LOG.debug("Number of timeout tuples:{}", tmp.size());
+            return;
+        }
+
+        String streamId = input.getSourceStreamId();
+        Object id = input.getValue(0);
+        AckObject curr = pending.get(id);
+        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                curr.spoutTask = input.getInteger(2);
+                pending.put(id, curr);
+            } else {
+                // If receiving bolt's ack before the init message from spout, just update the xor value.
+                curr.updateAck(input.getLong(1));
+                curr.spoutTask = input.getInteger(2);
+            }
+        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
+            if (curr != null) {
+                curr.updateAck(input.getLong(1));
+            } else {
+                curr = new AckObject();
+                curr.val = input.getLong(1);
+                pending.put(id, curr);
+            }
+        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
+            if (curr == null) {
+                // The tuple has been already timeout or failed. So, do nothing
+                return;
+            }
+            curr.failed = true;
+        } else {
+            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
+            return;
+        }
+
+        Integer task = curr.spoutTask;
+        if (task != null) {
+            if (curr.val == 0) {
+                pending.remove(id);
+                collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
+            } else {
+                if (curr.failed) {
+                    pending.remove(id);
+                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
+                }
+            }
+        }
+
+        collector.ack(input);
+    }
+
+    @Override
+    public void cleanup() {
+        LOG.info("Acker: cleanup successfully");
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
deleted file mode 100644
index 7c1514f..0000000
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.daemon;
-
-import org.apache.storm.task.IBolt;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Tuple;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.RotatingMap;
-import org.apache.storm.utils.TupleUtils;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public class AckerBolt implements IBolt {
-    private static final Logger LOG = LoggerFactory.getLogger(AckerBolt.class);
-
-    private static final long serialVersionUID = 4430906880683183091L;
-
-    public static final String ACKER_COMPONENT_ID = "__acker";
-    public static final String ACKER_INIT_STREAM_ID = "__ack_init";
-    public static final String ACKER_ACK_STREAM_ID = "__ack_ack";
-    public static final String ACKER_FAIL_STREAM_ID = "__ack_fail";
-
-    public static final int TIMEOUT_BUCKET_NUM = 3;
-
-    private OutputCollector collector;
-    private RotatingMap<Object, AckObject> pending;
-
-    private class AckObject {
-        public long val = 0L;
-        public Integer spoutTask = null;
-        public boolean failed = false;
-
-        // val xor value
-        public void updateAck(Object value) {
-            val = Utils.bitXor(val, value);
-        }
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-        this.pending = new RotatingMap<Object, AckObject>(TIMEOUT_BUCKET_NUM);
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        if (TupleUtils.isTick(input)) {
-            Map<Object, AckObject> tmp = pending.rotate();
-            LOG.debug("Number of timeout tuples:{}", tmp.size());
-            return;
-        }
-
-        String streamId = input.getSourceStreamId();
-        Object id = input.getValue(0);
-        AckObject curr = pending.get(id);
-        if (ACKER_INIT_STREAM_ID.equals(streamId)) {
-            if (curr == null) {
-                curr = new AckObject();
-                curr.val = input.getLong(1);
-                curr.spoutTask = input.getInteger(2);
-                pending.put(id, curr);
-            } else {
-                // If receiving bolt's ack before the init message from spout, just update the xor value.
-                curr.updateAck(input.getLong(1));
-                curr.spoutTask = input.getInteger(2);
-            }
-        } else if (ACKER_ACK_STREAM_ID.equals(streamId)) {
-            if (curr != null) {
-                curr.updateAck(input.getLong(1));
-            } else {
-                curr = new AckObject();
-                curr.val = input.getLong(1);
-                pending.put(id, curr);
-            }
-        } else if (ACKER_FAIL_STREAM_ID.equals(streamId)) {
-            if (curr == null) {
-                // The tuple has been already timeout or failed. So, do nothing
-                return;
-            }
-            curr.failed = true;
-        } else {
-            LOG.warn("Unknown source stream {} from task-{}", streamId, input.getSourceTask());
-            return;
-        }
-
-        Integer task = curr.spoutTask;
-        if (task != null) {
-            if (curr.val == 0) {
-                pending.remove(id);
-                collector.emitDirect(task, ACKER_ACK_STREAM_ID, new Values(id));
-            } else {
-                if (curr.failed) {
-                    pending.remove(id);
-                    collector.emitDirect(task, ACKER_FAIL_STREAM_ID, new Values(id));
-                }
-            }
-        }
-
-        collector.ack(input);
-    }
-
-    @Override
-    public void cleanup() {
-        LOG.info("Acker: cleanup successfully");
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/9603e309/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index c2c5e62..43c00fc 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -2278,7 +2278,7 @@ public class Utils {
         return process;
     }
 
-    public static long bitXor(Object a, Object b) {
-        return ((Long) a) ^ ((Long) b);
+    public static long bitXor(Long a, Long b) {
+        return a ^ b;
     }
 }


[09/13] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f54f2cf4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f54f2cf4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f54f2cf4

Branch: refs/heads/master
Commit: f54f2cf4e72091fd75c2b3f365caf3dcf31735e1
Parents: 4e26f06 4ca7522
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Mon Feb 22 10:22:22 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Mon Feb 22 10:22:22 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  18 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   4 +
 bin/storm.cmd                                   |  24 +-
 bin/storm.py                                    |  12 +-
 conf/cgconfig.conf.example                      |  41 +++
 conf/defaults.yaml                              |  16 +-
 examples/storm-starter/pom.xml                  |  10 +
 .../org/apache/storm/starter/clj/word_count.clj |   3 +-
 .../starter/ResourceAwareExampleTopology.java   |   2 +-
 .../spout/RandomNumberGeneratorSpout.java       |  95 +++++
 .../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++
 .../TridentMinMaxOfVehiclesTopology.java        | 180 ++++++++++
 external/storm-hdfs/pom.xml                     |  23 +-
 pom.xml                                         |  16 +
 storm-clojure/pom.xml                           |  74 ++++
 .../src/clj/org/apache/storm/clojure.clj        | 207 +++++++++++
 .../src/clj/org/apache/storm/thrift.clj         | 286 +++++++++++++++
 storm-clojure/src/test/clj/clojure_test.clj     | 158 +++++++++
 storm-core/pom.xml                              |   9 +
 storm-core/src/clj/org/apache/storm/clojure.clj | 207 -----------
 .../clj/org/apache/storm/command/activate.clj   |  24 --
 .../clj/org/apache/storm/command/deactivate.clj |  24 --
 .../org/apache/storm/command/dev_zookeeper.clj  |  28 --
 .../clj/org/apache/storm/command/get_errors.clj |   3 +-
 .../org/apache/storm/command/healthcheck.clj    |  90 -----
 .../org/apache/storm/command/kill_topology.clj  |  29 --
 .../src/clj/org/apache/storm/command/list.clj   |  38 --
 .../clj/org/apache/storm/command/monitor.clj    |   2 +-
 .../clj/org/apache/storm/command/rebalance.clj  |   3 +-
 .../org/apache/storm/command/set_log_level.clj  |   3 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/daemon/common.clj  | 121 ++++---
 .../clj/org/apache/storm/daemon/executor.clj    | 114 +++---
 .../clj/org/apache/storm/daemon/logviewer.clj   |  19 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 104 +++---
 .../clj/org/apache/storm/daemon/supervisor.clj  | 251 +++++++++----
 .../src/clj/org/apache/storm/daemon/task.clj    |   4 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  | 170 +++++----
 .../src/clj/org/apache/storm/disruptor.clj      |  89 -----
 storm-core/src/clj/org/apache/storm/event.clj   |  71 ----
 .../clj/org/apache/storm/internal/clojure.clj   | 201 +++++++++++
 .../clj/org/apache/storm/internal/thrift.clj    |  96 +++++
 .../src/clj/org/apache/storm/local_state.clj    | 134 -------
 .../org/apache/storm/local_state_converter.clj  |  24 ++
 storm-core/src/clj/org/apache/storm/testing.clj |  37 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  | 286 ---------------
 storm-core/src/clj/org/apache/storm/timer.clj   | 128 -------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   8 +-
 storm-core/src/jvm/org/apache/storm/Config.java |  88 +++++
 .../src/jvm/org/apache/storm/StormTimer.java    | 241 +++++++++++++
 storm-core/src/jvm/org/apache/storm/Thrift.java | 351 ++++++++++++++++++
 .../jvm/org/apache/storm/command/Activate.java  |  40 +++
 .../src/jvm/org/apache/storm/command/CLI.java   | 353 +++++++++++++++++++
 .../org/apache/storm/command/Deactivate.java    |  40 +++
 .../org/apache/storm/command/DevZookeeper.java  |  35 ++
 .../org/apache/storm/command/HealthCheck.java   | 125 +++++++
 .../org/apache/storm/command/KillTopology.java  |  51 +++
 .../src/jvm/org/apache/storm/command/List.java  |  50 +++
 .../container/ResourceIsolationInterface.java   |  51 +++
 .../storm/container/cgroup/CgroupCenter.java    | 216 ++++++++++++
 .../storm/container/cgroup/CgroupCommon.java    | 270 ++++++++++++++
 .../container/cgroup/CgroupCommonOperation.java |  81 +++++
 .../container/cgroup/CgroupCoreFactory.java     |  74 ++++
 .../storm/container/cgroup/CgroupManager.java   | 210 +++++++++++
 .../storm/container/cgroup/CgroupOperation.java |  79 +++++
 .../storm/container/cgroup/CgroupUtils.java     | 118 +++++++
 .../apache/storm/container/cgroup/Device.java   |  75 ++++
 .../storm/container/cgroup/Hierarchy.java       | 130 +++++++
 .../storm/container/cgroup/SubSystem.java       |  81 +++++
 .../storm/container/cgroup/SubSystemType.java   |  36 ++
 .../storm/container/cgroup/SystemOperation.java |  75 ++++
 .../storm/container/cgroup/core/BlkioCore.java  | 213 +++++++++++
 .../storm/container/cgroup/core/CgroupCore.java |  26 ++
 .../storm/container/cgroup/core/CpuCore.java    | 135 +++++++
 .../container/cgroup/core/CpuacctCore.java      |  71 ++++
 .../storm/container/cgroup/core/CpusetCore.java | 209 +++++++++++
 .../container/cgroup/core/DevicesCore.java      | 189 ++++++++++
 .../container/cgroup/core/FreezerCore.java      |  66 ++++
 .../storm/container/cgroup/core/MemoryCore.java | 188 ++++++++++
 .../storm/container/cgroup/core/NetClsCore.java |  69 ++++
 .../container/cgroup/core/NetPrioCore.java      |  65 ++++
 .../org/apache/storm/event/EventManager.java    |  24 ++
 .../org/apache/storm/event/EventManagerImp.java |  97 +++++
 .../jvm/org/apache/storm/testing/NGrouping.java |   4 +-
 .../storm/testing/PythonShellMetricsBolt.java   |  14 +-
 .../storm/testing/PythonShellMetricsSpout.java  |   8 +-
 .../jvm/org/apache/storm/trident/Stream.java    | 121 ++++++-
 .../operation/builtin/ComparisonAggregator.java |  91 +++++
 .../storm/trident/operation/builtin/Max.java    |  37 ++
 .../operation/builtin/MaxWithComparator.java    |  51 +++
 .../storm/trident/operation/builtin/Min.java    |  36 ++
 .../operation/builtin/MinWithComparator.java    |  51 +++
 .../org/apache/storm/utils/DisruptorQueue.java  |  15 +-
 .../jvm/org/apache/storm/utils/LocalState.java  | 112 +++++-
 .../org/apache/storm/utils/NimbusClient.java    |  19 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |  37 +-
 .../org/apache/storm/integration_test.clj       | 259 ++++++++------
 .../org/apache/storm/testing4j_test.clj         |  72 ++--
 .../test/clj/org/apache/storm/clojure_test.clj  |  64 ++--
 .../test/clj/org/apache/storm/cluster_test.clj  |   3 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../test/clj/org/apache/storm/grouping_test.clj |  56 +--
 .../storm/messaging/netty_integration_test.clj  |  18 +-
 .../clj/org/apache/storm/messaging_test.clj     |  14 +-
 .../test/clj/org/apache/storm/metrics_test.clj  |  85 +++--
 .../test/clj/org/apache/storm/nimbus_test.clj   | 260 +++++++++-----
 .../scheduler/resource_aware_scheduler_test.clj |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 175 ++++-----
 .../clj/org/apache/storm/tick_tuple_test.clj    |  15 +-
 .../clj/org/apache/storm/transactional_test.clj |   3 +-
 .../test/jvm/org/apache/storm/TestCgroups.java  | 130 +++++++
 .../jvm/org/apache/storm/command/TestCLI.java   |  59 ++++
 .../resource/TestResourceAwareScheduler.java    |   3 +
 114 files changed, 7747 insertions(+), 2003 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f54f2cf4/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------


[12/13] storm git commit: Merge remote-tracking branch 'upstream/master'

Posted by lo...@apache.org.
Merge remote-tracking branch 'upstream/master'

Conflicts:
	storm-core/src/clj/org/apache/storm/testing.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6e433c83
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6e433c83
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6e433c83

Branch: refs/heads/master
Commit: 6e433c83c9ab581d8cbe9f7b42b97ffb8e25a0b4
Parents: 14b993a 07629c1
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 28 20:23:29 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 28 20:23:29 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  21 +-
 README.markdown                                 |   3 +-
 bin/flight.bash                                 |   4 +-
 bin/storm.cmd                                   |   2 +-
 bin/storm.py                                    |   6 +-
 conf/defaults.yaml                              |   4 +-
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |  10 +-
 .../storm/hdfs/bolt/AvroGenericRecordBolt.java  |   8 +-
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    |   8 +-
 .../storm/hdfs/bolt/SequenceFileBolt.java       |   8 +-
 .../org/apache/storm/kafka/IntSerializer.java   |  10 +-
 .../apache/storm/kafka/PartitionManager.java    |   5 +-
 .../kafka/trident/TridentKafkaEmitter.java      |   5 +-
 .../src/clj/org/apache/storm/thrift.clj         |   2 +-
 storm-core/src/clj/org/apache/storm/cluster.clj | 700 -------------------
 .../cluster_state/zookeeper_state_factory.clj   | 165 -----
 .../clj/org/apache/storm/command/heartbeats.clj |   6 +-
 .../clj/org/apache/storm/command/monitor.clj    |  37 -
 .../clj/org/apache/storm/command/rebalance.clj  |  47 --
 .../org/apache/storm/command/set_log_level.clj  |  76 --
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/converter.clj      |  23 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  12 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  38 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  13 +-
 .../clj/org/apache/storm/daemon/logviewer.clj   |  65 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 165 +++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  46 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  68 +-
 .../clj/org/apache/storm/internal/thrift.clj    |   2 +-
 .../storm/pacemaker/pacemaker_state_factory.clj | 141 ----
 .../clj/org/apache/storm/process_simulator.clj  |  49 --
 storm-core/src/clj/org/apache/storm/stats.clj   |   3 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  21 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj | 128 ++--
 .../src/clj/org/apache/storm/ui/helpers.clj     | 199 +-----
 storm-core/src/clj/org/apache/storm/util.clj    |  11 +
 .../src/clj/org/apache/storm/zookeeper.clj      |  74 --
 storm-core/src/jvm/org/apache/storm/Config.java |   9 +
 .../jvm/org/apache/storm/ProcessSimulator.java  |  82 +++
 .../storm/blobstore/LocalFsBlobStore.java       |   2 +-
 .../jvm/org/apache/storm/callback/Callback.java |  23 -
 .../storm/callback/ZKStateChangedCallback.java  |  25 +
 .../org/apache/storm/cluster/ClusterState.java  | 217 ------
 .../storm/cluster/ClusterStateContext.java      |   2 +-
 .../storm/cluster/ClusterStateFactory.java      |  28 -
 .../org/apache/storm/cluster/ClusterUtils.java  | 244 +++++++
 .../org/apache/storm/cluster/ExecutorBeat.java  |  44 ++
 .../org/apache/storm/cluster/IStateStorage.java | 222 ++++++
 .../storm/cluster/IStormClusterState.java       | 124 ++++
 .../storm/cluster/PaceMakerStateStorage.java    | 216 ++++++
 .../cluster/PaceMakerStateStorageFactory.java   |  64 ++
 .../storm/cluster/StateStorageFactory.java      |  28 +
 .../storm/cluster/StormClusterStateImpl.java    | 692 ++++++++++++++++++
 .../apache/storm/cluster/ZKStateStorage.java    | 244 +++++++
 .../storm/cluster/ZKStateStorageFactory.java    |  36 +
 .../src/jvm/org/apache/storm/command/CLI.java   |  34 +-
 .../jvm/org/apache/storm/command/Monitor.java   |  65 ++
 .../jvm/org/apache/storm/command/Rebalance.java |  86 +++
 .../org/apache/storm/command/SetLogLevel.java   | 116 +++
 .../storm/metric/FileBasedEventLogger.java      |  18 +-
 .../apache/storm/pacemaker/PacemakerClient.java |   6 +-
 .../security/auth/ThriftConnectionType.java     |   2 +-
 .../serialization/SerializationFactory.java     |   2 +
 .../testing/staticmocking/MockedCluster.java    |  31 +
 .../MockedPaceMakerStateStorageFactory.java     |  32 +
 .../apache/storm/topology/TopologyBuilder.java  |  13 +-
 .../apache/storm/trident/tuple/ConsList.java    |  20 +-
 .../apache/storm/ui/FilterConfiguration.java    |  63 ++
 .../jvm/org/apache/storm/ui/IConfigurator.java  |  24 +
 .../src/jvm/org/apache/storm/ui/UIHelpers.java  | 267 +++++++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |   2 +-
 .../src/jvm/org/apache/storm/utils/Time.java    |   1 +
 .../src/jvm/org/apache/storm/utils/Utils.java   |  78 ++-
 .../storm/utils/WorkerBackpressureCallback.java |   2 +-
 .../storm/utils/WorkerBackpressureThread.java   |  38 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |  77 +-
 storm-core/src/ui/public/component.html         |   2 +-
 .../templates/topology-page-template.html       |   6 +-
 storm-core/src/ui/public/topology.html          |   2 +-
 .../org/apache/storm/integration_test.clj       |  13 +-
 .../test/clj/org/apache/storm/cluster_test.clj  | 202 +++---
 .../storm/messaging/netty_integration_test.clj  |   1 +
 .../test/clj/org/apache/storm/nimbus_test.clj   | 158 +++--
 .../storm/pacemaker_state_factory_test.clj      | 121 ++--
 .../storm/security/auth/nimbus_auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 163 +++--
 .../test/clj/org/apache/storm/utils_test.clj    | 111 ---
 .../org/apache/storm/command/RebalanceTest.java |  41 ++
 .../apache/storm/command/SetLogLevelTest.java   |  54 ++
 .../jvm/org/apache/storm/command/TestCLI.java   |  44 +-
 .../storm/topology/TopologyBuilderTest.java     |  65 ++
 .../jvm/org/apache/storm/utils/TimeTest.java    | 112 +++
 .../jvm/org/apache/storm/utils/UtilsTest.java   | 219 ++++++
 .../utils/WorkerBackpressureThreadTest.java     |  50 ++
 .../storm/utils/staticmocking/package-info.java |   2 +-
 96 files changed, 4198 insertions(+), 2637 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 804278c,8242c3e..66fc051
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -49,8 -50,9 +50,8 @@@
    (:import [org.apache.storm.task TopologyContext]
             (org.apache.storm.messaging IContext)
             [org.json.simple JSONValue])
-   (:require [org.apache.storm [zookeeper :as zk]])
-   (:use [org.apache.storm cluster util config log local-state-converter])
+   (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils])
 -  (:require [org.apache.storm.daemon.acker :as acker])
+   (:use [org.apache.storm util config log local-state-converter converter])
    (:use [org.apache.storm.internal thrift]))
  
  (defn feeder-spout

http://git-wip-us.apache.org/repos/asf/storm/blob/6e433c83/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------


[08/13] storm git commit: Revert some code format problems caused by auto merging

Posted by lo...@apache.org.
Revert some code format problems caused by auto merging


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4e26f06b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4e26f06b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4e26f06b

Branch: refs/heads/master
Commit: 4e26f06b2d7d343953985ed611b58193c4246bbb
Parents: 6c0f4f0
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Sun Feb 14 14:33:44 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Sun Feb 14 14:33:44 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/acker.clj   |  3 ++-
 .../src/clj/org/apache/storm/daemon/common.clj  |  6 ++---
 .../src/jvm/org/apache/storm/utils/Utils.java   | 24 ++++++++++----------
 3 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/acker.clj b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
index 7e17d40..9aa15ae 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/acker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/acker.clj
@@ -44,7 +44,8 @@
 (defn -prepare [^org.apache.storm.daemon.acker this conf context collector]
   (let [^IBolt ret (mk-acker-bolt)]
     (.. this state (set ret))
-    (.prepare ret conf context collector)))
+    (.prepare ret conf context collector)
+    ))
 
 (defn -execute [^org.apache.storm.daemon.acker this tuple]
   (let [^IBolt delegate (.. this state (get))]

http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 42fa1fa..eb1ec1e 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -28,7 +28,7 @@
   (:import [org.apache.storm.security.auth IAuthorizer])
   (:import [java.io InterruptedIOException]
            [org.json.simple JSONValue])
-  (:require [clojure.set :as set])
+  (:require [clojure.set :as set])  
   (:require [org.apache.storm.daemon.acker :as acker])
   (:require [org.apache.storm.thrift :as thrift])
   (:require [metrics.core  :refer [default-registry]]))
@@ -144,8 +144,8 @@
 
 (defn component-conf [component]
   (->> component
-      .get_common
-      .get_json_conf
+       .get_common
+       .get_json_conf
        (#(if % (JSONValue/parse %)))
        clojurify-structure))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/4e26f06b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index 44fb1a1..d098d83 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -512,7 +512,7 @@ public class Utils {
 
         if(store != null) {
             // store can be null during testing when mocking utils.
-        store.prepare(nconf, baseDir, nimbusInfo);
+            store.prepare(nconf, baseDir, nimbusInfo);
         }
         return store;
     }
@@ -946,17 +946,17 @@ public class Utils {
                 inputStream = new BufferedInputStream(new FileInputStream(inFile));
             }
             try (TarArchiveInputStream tis = new TarArchiveInputStream(inputStream)) {
-            for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
-                unpackEntries(tis, entry, untarDir);
-                entry = tis.getNextTarEntry();
-            }
+                for (TarArchiveEntry entry = tis.getNextTarEntry(); entry != null; ) {
+                    unpackEntries(tis, entry, untarDir);
+                    entry = tis.getNextTarEntry();
+                }
             }
         } finally {
             if(inputStream != null) {
                 inputStream.close();
+            }
         }
     }
-                }
 
     private static void unpackEntries(TarArchiveInputStream tis,
                                       TarArchiveEntry entry, File outputDir) throws IOException {
@@ -975,7 +975,7 @@ public class Utils {
         if (!outputFile.getParentFile().exists()) {
             if (!outputFile.getParentFile().mkdirs()) {
                 throw new IOException("Mkdirs failed to create tar internal dir "
-                        + outputDir);
+                                      + outputDir);
             }
         }
         int count;
@@ -1190,11 +1190,11 @@ public class Utils {
         }
         String stormZKUser = (String)conf.get(Config.STORM_ZOOKEEPER_SUPERACL);
         if (stormZKUser == null) {
-            throw new IllegalArgumentException("Authentication is enabled but "+Config.STORM_ZOOKEEPER_SUPERACL+" is not set");
+            throw new IllegalArgumentException("Authentication is enabled but " + Config.STORM_ZOOKEEPER_SUPERACL + " is not set");
         }
-        String[] split = stormZKUser.split(":",2);
+        String[] split = stormZKUser.split(":", 2);
         if (split.length != 2) {
-            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL+" does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
+            throw new IllegalArgumentException(Config.STORM_ZOOKEEPER_SUPERACL + " does not appear to be in the form scheme:acl, i.e. sasl:storm-user");
         }
         ArrayList<ACL> ret = new ArrayList<ACL>(ZooDefs.Ids.CREATOR_ALL_ACL);
         ret.add(new ACL(ZooDefs.Perms.ALL, new Id(split[0], split[1])));
@@ -1258,7 +1258,7 @@ public class Utils {
         return dump.toString();
     }
 
-    /*
+    /**
      * Creates an instance of the pluggable SerializationDelegate or falls back to
      * DefaultSerializationDelegate if something goes wrong.
      * @param stormConf The config from which to pull the name of the pluggable class.
@@ -1316,7 +1316,7 @@ public class Utils {
                         if (!file.getParentFile().mkdirs()) {
                             if (!file.getParentFile().isDirectory()) {
                                 throw new IOException("Mkdirs failed to create " +
-                                        file.getParentFile().toString());
+                                                      file.getParentFile().toString());
                             }
                         }
                         OutputStream out = new FileOutputStream(file);


[04/13] storm git commit: Acker bolt should return after processing timeout tick tuple

Posted by lo...@apache.org.
Acker bolt should return after processing timeout tick tuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c4dfa33c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c4dfa33c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c4dfa33c

Branch: refs/heads/master
Commit: c4dfa33c58ac8e1d3c6197b86b2976b9669a39f3
Parents: e50a312
Author: basti.lj <ba...@alibaba-inc.com>
Authored: Wed Feb 3 18:43:17 2016 +0800
Committer: basti.lj <ba...@alibaba-inc.com>
Committed: Wed Feb 3 18:43:17 2016 +0800

----------------------------------------------------------------------
 storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c4dfa33c/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
index 80ed4ca..a4f6815 100644
--- a/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
+++ b/storm-core/src/jvm/org/apache/storm/daemon/AckerBolt.java
@@ -67,6 +67,7 @@ public class AckerBolt implements IBolt {
         if (TupleUtils.isTick(input)) {
             Map<Object, AckObject> tmp = pending.rotate();
             LOG.debug("Number of timeout tuples:{}", tmp.size());
+            return;
         }
 
         String streamId = input.getSourceStreamId();
@@ -115,8 +116,6 @@ public class AckerBolt implements IBolt {
                     collector.emitDirect(task, ACKER_FAIL_STREAM_ID, values);
                 }
             }
-        } else {
-
         }
 
         collector.ack(input);