You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 20:10:48 UTC

[1/2] storm git commit: Merge branch 'STORM-737' of github.com:HeartSaVioR/storm

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch ed2e89117 -> 7e5e5f0e3


Merge branch 'STORM-737' of github.com:HeartSaVioR/storm

Conflicts:
	storm-core/src/clj/backtype/storm/daemon/worker.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/acc1a79d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/acc1a79d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/acc1a79d

Branch: refs/heads/0.10.x-branch
Commit: acc1a79de17802c59810193311aa182b963e8511
Parents: ed2e891
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 13:53:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 13:58:23 2015 -0400

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 13 ++--
 .../backtype/storm/utils/TransferDrainer.java   | 62 +++++++++++++-------
 2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index a83146b..bd5b9c1 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -129,10 +129,10 @@
                   (.add local pair) 
 
                   ;;Using java objects directly to avoid performance issues in java code
-                  (let [node+port (get @task->node+port task)]
-                    (when (not (.get remoteMap node+port))
-                      (.put remoteMap node+port (ArrayList.)))
-                    (let [remote (.get remoteMap node+port)]
+                  (do
+                    (when (not (.get remoteMap task))
+                      (.put remoteMap task (ArrayList.)))
+                    (let [remote (.get remoteMap task)]
                       (if (not-nil? task)
                         (.add remote (TaskMessage. task (.serialize serializer tuple)))
                         (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
@@ -342,8 +342,9 @@
         
         (when batch-end?
           (read-locked endpoint-socket-lock
-            (let [node+port->socket @node+port->socket]
-              (.send drainer node+port->socket)))
+             (let [node+port->socket @node+port->socket
+                   task->node+port @task->node+port]
+               (.send drainer task->node+port node+port->socket)))
           (.clear drainer))))))
 
 ;; Check whether this messaging connection is ready to send data

http://git-wip-us.apache.org/repos/asf/storm/blob/acc1a79d/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 0e53632..5a111ce 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -23,40 +23,60 @@ import java.util.Iterator;
 
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
 
 public class TransferDrainer {
 
-  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
   
-  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
-    for (String key : workerTupleSetMap.keySet()) {
-      
-      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
-      if (null == bundle) {
-        bundle = new ArrayList<ArrayList<TaskMessage>>();
-        bundles.put(key, bundle);
-      }
-      
-      ArrayList tupleSet = workerTupleSetMap.get(key);
-      if (null != tupleSet && tupleSet.size() > 0) {
-        bundle.add(tupleSet);
-      }
-    } 
+  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
+    for (Integer task : taskTupleSetMap.keySet()) {
+      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+    }
   }
   
-  public void send(HashMap<String, IConnection> connections) {
-    for (String hostPort : bundles.keySet()) {
+  public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+    for (String hostPort : bundleMapByDestination.keySet()) {
       IConnection connection = connections.get(hostPort);
-      if (null != connection) { 
-        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+      if (null != connection) {
+        ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
         Iterator<TaskMessage> iter = getBundleIterator(bundle);
         if (null != iter && iter.hasNext()) {
           connection.send(iter);
         }
       }
-    } 
+    }
   }
-  
+
+  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
+    for (Integer task : this.bundles.keySet()) {
+      String hostPort = taskToNode.get(task);
+      if (hostPort != null) {
+        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
+          addListRefToMap(bundleMap, hostPort, chunk);
+        }
+      }
+    }
+    return bundleMap;
+  }
+
+  private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
+                                   T key, ArrayList<TaskMessage> tuples) {
+    ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
+
+    if (null == bundle) {
+      bundle = new ArrayList<ArrayList<TaskMessage>>();
+      bundleMap.put(key, bundle);
+    }
+
+    if (null != tuples && tuples.size() > 0) {
+      bundle.add(tuples);
+    }
+  }
+
   private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
     
     if (null == bundle) {


[2/2] storm git commit: add STORM-737 to changelog

Posted by pt...@apache.org.
add STORM-737 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7e5e5f0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7e5e5f0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7e5e5f0e

Branch: refs/heads/0.10.x-branch
Commit: 7e5e5f0e38069a069d560878aeb88e1a5f8015f8
Parents: acc1a79
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 14:00:56 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 14:00:56 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7e5e5f0e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 50dcb2d..b0b8d0e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-737] Check task->node+port with read lock to prevent sending to closed connection
  * STORM-835 Netty Client hold batch object until io operation complete
  * STORM-827: Allow AutoTGT to work with storm-hdfs too.
  * STORM-728: Put emitted and transferred stats under correct columns