You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 19:55:47 UTC

[2/6] storm git commit: Change TransferDrainer to re-group msg by destination when sending

Change TransferDrainer to re-group msg by destination when sending


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85af1950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85af1950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85af1950

Branch: refs/heads/master
Commit: 85af195049fd1229acc62a1b8638415c06b6cf9d
Parents: a1d7b3e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu May 14 06:21:32 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu May 14 06:21:32 2015 +0900

----------------------------------------------------------------------
 .../backtype/storm/utils/TransferDrainer.java   | 53 +++++++++++++-------
 1 file changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/85af1950/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 20b4545..6130bfe 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -23,34 +23,26 @@ import java.util.Iterator;
 
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
 
 public class TransferDrainer {
 
   private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
   
   public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
-    for (Integer key : taskTupleSetMap.keySet()) {
-      
-      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
-      if (null == bundle) {
-        bundle = new ArrayList<ArrayList<TaskMessage>>();
-        bundles.put(key, bundle);
-      }
-      
-      ArrayList tupleSet = taskTupleSetMap.get(key);
-      if (null != tupleSet && tupleSet.size() > 0) {
-        bundle.add(tupleSet);
-      }
-    } 
+    for (Integer task : taskTupleSetMap.keySet()) {
+      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+    }
   }
   
   public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
-    for (Integer task : bundles.keySet()) {
-      String hostPort = taskToNode.get(task);
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+    for (String hostPort : bundleMapByDestination.keySet()) {
       if (hostPort != null) {
         IConnection connection = connections.get(hostPort);
         if (null != connection) {
-          ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(task);
+          ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
           Iterator<TaskMessage> iter = getBundleIterator(bundle);
           if (null != iter && iter.hasNext()) {
             connection.send(iter);
@@ -59,7 +51,34 @@ public class TransferDrainer {
       }
     }
   }
-  
+
+  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
+    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
+    for (Integer task : this.bundles.keySet()) {
+      String hostPort = taskToNode.get(task);
+      if (hostPort != null) {
+        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
+          addListRefToMap(bundleMap, hostPort, chunk);
+        }
+      }
+    }
+    return bundleMap;
+  }
+
+  private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles,
+                                   T key, ArrayList<TaskMessage> tuples) {
+    ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+
+    if (null == bundle) {
+      bundle = new ArrayList<ArrayList<TaskMessage>>();
+      bundles.put(key, bundle);
+    }
+
+    if (null != tuples && tuples.size() > 0) {
+      bundle.add(tuples);
+    }
+  }
+
   private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
     
     if (null == bundle) {