You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/06/01 19:37:55 UTC
[20/50] storm git commit: Apply addressed comments from review
Apply addressed comments from review
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/85c5096e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/85c5096e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/85c5096e
Branch: refs/heads/0.10.x-branch
Commit: 85c5096e0806ac872739d8b73574a8f1e5d31679
Parents: 52bd47b
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu May 21 06:30:23 2015 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu May 21 06:30:23 2015 +0900
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 4 ++--
.../backtype/storm/utils/TransferDrainer.java | 20 +++++++++-----------
2 files changed, 11 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/85c5096e/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 6da8692..fe64474 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -129,12 +129,12 @@
(.add local pair)
;;Using java objects directly to avoid performance issues in java code
- (let []
+ (do
(when (not (.get remoteMap task))
(.put remoteMap task (ArrayList.)))
(let [remote (.get remoteMap task)]
(.add remote (TaskMessage. task (.serialize serializer tuple)))
- ))))
+ ))))
(local-transfer local)
(disruptor/publish transfer-queue remoteMap)
))]
http://git-wip-us.apache.org/repos/asf/storm/blob/85c5096e/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
index 6130bfe..5a111ce 100644
--- a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -39,14 +39,12 @@ public class TransferDrainer {
HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
for (String hostPort : bundleMapByDestination.keySet()) {
- if (hostPort != null) {
- IConnection connection = connections.get(hostPort);
- if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
- Iterator<TaskMessage> iter = getBundleIterator(bundle);
- if (null != iter && iter.hasNext()) {
- connection.send(iter);
- }
+ IConnection connection = connections.get(hostPort);
+ if (null != connection) {
+ ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
+ Iterator<TaskMessage> iter = getBundleIterator(bundle);
+ if (null != iter && iter.hasNext()) {
+ connection.send(iter);
}
}
}
@@ -65,13 +63,13 @@ public class TransferDrainer {
return bundleMap;
}
- private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles,
+ private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
T key, ArrayList<TaskMessage> tuples) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+ ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);
if (null == bundle) {
bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundles.put(key, bundle);
+ bundleMap.put(key, bundle);
}
if (null != tuples && tuples.size() > 0) {