You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/29 19:55:50 UTC

[5/6] storm git commit: Merge branch 'STORM-737' of github.com:HeartSaVioR/storm

Merge branch 'STORM-737' of github.com:HeartSaVioR/storm

Conflicts:
	storm-core/src/clj/backtype/storm/daemon/worker.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4073dbe6
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4073dbe6
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4073dbe6

Branch: refs/heads/master
Commit: 4073dbe6cc98c55b9705888ae697778a9974ad30
Parents: 3f50b72 85c5096
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 29 13:53:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 29 13:53:39 2015 -0400

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/worker.clj    | 13 ++--
 .../backtype/storm/utils/TransferDrainer.java   | 62 +++++++++++++-------
 2 files changed, 48 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4073dbe6/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/daemon/worker.clj
index efa1d1a,fe64474..ae7b3ae
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@@ -129,14 -129,12 +129,14 @@@
                    (.add local pair) 
  
                    ;;Using java objects directly to avoid performance issues in java code
-                   (let [node+port (get @task->node+port task)]
-                     (when (not (.get remoteMap node+port))
-                       (.put remoteMap node+port (ArrayList.)))
-                     (let [remote (.get remoteMap node+port)]
+                   (do
+                     (when (not (.get remoteMap task))
+                       (.put remoteMap task (ArrayList.)))
+                     (let [remote (.get remoteMap task)]
 -                      (.add remote (TaskMessage. task (.serialize serializer tuple)))
 -                      ))))
 +                      (if (not-nil? task)
 +                        (.add remote (TaskMessage. task (.serialize serializer tuple)))
 +                        (log-warn "Can't transfer tuple - task value is nil. tuple type: " (pr-str (type tuple)) " and information: " (pr-str tuple)))
 +                     ))))
                  (local-transfer local)
                  (disruptor/publish transfer-queue remoteMap)
                ))]
@@@ -342,8 -340,9 +342,9 @@@
          
          (when batch-end?
            (read-locked endpoint-socket-lock
-             (let [node+port->socket @node+port->socket]
-               (.send drainer node+port->socket)))
 -            (let [node+port->socket @node+port->socket
 -                  task->node+port @task->node+port]
 -              (.send drainer task->node+port node+port->socket)))
++             (let [node+port->socket @node+port->socket
++                   task->node+port @task->node+port]
++               (.send drainer task->node+port node+port->socket)))
            (.clear drainer))))))
  
  ;; Check whether this messaging connection is ready to send data