You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 19:55:50 UTC
[5/6] storm git commit: Merge branch 'STORM-737' of
github.com:HeartSaVioR/storm
Merge branch 'STORM-737' of github.com:HeartSaVioR/storm
Conflicts:
storm-core/src/clj/backtype/storm/daemon/worker.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4073dbe6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4073dbe6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4073dbe6
Branch: refs/heads/master
Commit: 4073dbe6cc98c55b9705888ae697778a9974ad30
Parents: 3f50b72 85c5096
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 13:53:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 13:53:39 2015 -0400
----------------------------------------------------------------------
.../src/clj/backtype/storm/daemon/worker.clj | 13 ++--
.../backtype/storm/utils/TransferDrainer.java | 62 +++++++++++++-------
2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4073dbe6/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index efa1d1a,fe64474..ae7b3ae
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -129,14 -129,12 +129,14 @@@
(.add local pair)
;;Using java objects directly to avoid performance issues in java code
- (let [node+port (get @task->node+port task)]
- (when (not (.get remoteMap node+port))
- (.put remoteMap node+port (ArrayList.)))
- (let [remote (.get remoteMap node+port)]
+ (do
+ (when (not (.get remoteMap task))
+ (.put remoteMap task (ArrayList.)))
+ (let [remote (.get remoteMap task)]
- (.add remote (TaskMessage. task (.serialize serializer tuple)))
- ))))
+ (if (not-nil? task)
+ (.add remote (TaskMessage. task (.serialize serializer tuple)))
+ (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
+ ))))
(local-transfer local)
(disruptor/publish transfer-queue remoteMap)
))]
@@@ -342,8 -340,9 +342,9 @@@
(when batch-end?
(read-locked endpoint-socket-lock
- (let [node+port->socket @node+port->socket]
- (.send drainer node+port->socket)))
- (let [node+port->socket @node+port->socket
- task->node+port @task->node+port]
- (.send drainer task->node+port node+port->socket)))
++ (let [node+port->socket @node+port->socket
++ task->node+port @task->node+port]
++ (.send drainer task->node+port node+port->socket)))
(.clear drainer))))))
;; Check whether this messaging connection is ready to send data