You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 19:55:47 UTC
[2/6] storm git commit: Change TransferDrainer to re-group msg by
destination when sending
Change TransferDrainer to re-group msg by destination when sending
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85af1950
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85af1950
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85af1950
Branch: refs/heads/master
Commit: 85af195049fd1229acc62a1b8638415c06b6cf9d
Parents: a1d7b3e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu May 14 06:21:32 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu May 14 06:21:32 2015 +0900
----------------------------------------------------------------------
.../backtype/storm/utils/TransferDrainer.java | 53 +++++++++++++-------
1 file changed, 36 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/85af1950/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 20b4545..6130bfe 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -23,34 +23,26 @@ import java.util.Iterator;
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
public class TransferDrainer {
private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
- for (Integer key : taskTupleSetMap.keySet()) {
-
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
- if (null == bundle) {
- bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundles.put(key, bundle);
- }
-
- ArrayList tupleSet = taskTupleSetMap.get(key);
- if (null != tupleSet && tupleSet.size() > 0) {
- bundle.add(tupleSet);
- }
- }
+ for (Integer task : taskTupleSetMap.keySet()) {
+ addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+ }
}
public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
- for (Integer task : bundles.keySet()) {
- String hostPort = taskToNode.get(task);
+ HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+ for (String hostPort : bundleMapByDestination.keySet()) {
if (hostPort != null) {
IConnection connection = connections.get(hostPort);
if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(task);
+ ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
Iterator<TaskMessage> iter = getBundleIterator(bundle);
if (null != iter && iter.hasNext()) {
connection.send(iter);
@@ -59,7 +51,34 @@ public class TransferDrainer {
}
}
}
-
+
+ private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
+ HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
+ for (Integer task : this.bundles.keySet()) {
+ String hostPort = taskToNode.get(task);
+ if (hostPort != null) {
+ for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
+ addListRefToMap(bundleMap, hostPort, chunk);
+ }
+ }
+ }
+ return bundleMap;
+ }
+
+ private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles,
+ T key, ArrayList<TaskMessage> tuples) {
+ ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+
+ if (null == bundle) {
+ bundle = new ArrayList<ArrayList<TaskMessage>>();
+ bundles.put(key, bundle);
+ }
+
+ if (null != tuples && tuples.size() > 0) {
+ bundle.add(tuples);
+ }
+ }
+
private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
if (null == bundle) {