You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by HeartSaVioR <gi...@git.apache.org> on 2015/04/14 00:28:30 UTC

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

GitHub user HeartSaVioR opened a pull request:

    https://github.com/apache/storm/pull/521

    [STORM-737] Check task->node+port with read lock to prevent sending to closed connection

    It's based on Nathan's comment, please refer to https://github.com/apache/storm/pull/349#issuecomment-87778672
    
    https://github.com/apache/storm/commit/861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b seems to make a regression. 
    But it also introduces optimizations of sending, it shouldn't be discarded.
    
    I changed send logic to let TransferDrainer matches task to node+port so then we can still enjoy optimization of sending logic.
    
    I'm still not familiar with clojure so please review and comment if it can be optimized.
    Thanks!

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/HeartSaVioR/storm STORM-737

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/521.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #521
    
----
commit a1d7b3eb343f304565fe24fb7e0151bfbcb3824e
Author: Jungtaek Lim <ka...@gmail.com>
Date:   2015-04-13T22:16:37Z

    While sending tuple, check task->node+port with read lock
    
    * we can ensure task->node+port is safe within read lock
    ** refer write lock inside of mk-refresh-connections
    * Let TransferDrainer matches task to node+port
    ** So then we can still enjoy optimization of sending logic

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101515533
  
    @d2r I can't reproduce test failures. I'll give it a try again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30750154
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
       }
       
    -  public void send(HashMap<String, IConnection> connections) {
    -    for (String hostPort : bundles.keySet()) {
    -      IConnection connection = connections.get(hostPort);
    -      if (null != connection) { 
    -        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
    -        Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -        if (null != iter && iter.hasNext()) {
    -          connection.send(iter);
    +  public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
    +
    +    for (String hostPort : bundleMapByDestination.keySet()) {
    +      if (hostPort != null) {
    --- End diff --
    
    @d2r 
    No, we removed it into groupBundleByDestination(), so we don't need to check again. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101826046
  
    @clockfly 
    I'd like you to have a look since I don't want to destroy your optimization, but just fix the issue.
    Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-106086856
  
    @d2r No Problem, thanks for reviewing and taking performance test!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101449371
  
    Please note that packets shouldn't have values which can be changed while passing to transfer flow.
    task->node+port can be changed (I mean node+port for tuple can be changed), but task id for tuple is not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30463274
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    If we leave the code here unchanged, we will not need to re-group by destination later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR closed the pull request at:

    https://github.com/apache/storm/pull/521


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101446038
  
    @d2r 
    First of all, I'm sorry I did some mistakes about expression.
    I mean, your approach follows some of Nathan's comment, but not all.
    Your patch can also resolve STORM-737, with dropping some tuples when task->node+port is changed, while we can save these.
    
    @nathanmarz said 861a92e made a regression, so it means, before applying 861a92e (0666c41387fc11c0422b26ab27ebc38c30fe26af) was right.
    AFAIK mk-transfer-fn is looking inside task->node+port while it should be handled from read-lock, and that's the point of regression.
    You can find out I'm trying to revert it when you see changeset of mk-transfer-tuples-handler in that commit.
    Current PR takes same logic (yes, same) from old thing but playing with TransferDrainer.
    
    * PR
    ```
    (defn mk-transfer-tuples-handler [worker]
      (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (TransferDrainer.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
        (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.add drainer packets)
            
            (when batch-end?
              (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket
                      task->node+port @task->node+port]
                  (.send drainer task->node+port node+port->socket)))
              (.clear drainer))))))
    ```
    
    * Old (that Nathan said it was right)
    https://github.com/apache/storm/blob/0666c41387fc11c0422b26ab27ebc38c30fe26af/storm-core/src/clj/backtype/storm/daemon/worker.clj
    ```
    (defn mk-transfer-tuples-handler [worker]
      (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
            drainer (ArrayList.)
            node+port->socket (:cached-node+port->socket worker)
            task->node+port (:cached-task->node+port worker)
            endpoint-socket-lock (:endpoint-socket-lock worker)
            ]
        (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.addAll drainer packets)
            (when batch-end?
              (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket
                      task->node+port @task->node+port]
                  ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
                  ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)
                
                  (fast-list-iter [[task ser-tuple] drainer]
                    ;; TODO: consider write a batch of tuples here to every target worker  
                    ;; group by node+port, do multipart send              
                    (let [node-port (get task->node+port task)]
                      (when node-port
                        (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
                        ))))
              (.clear drainer))))))
    ```
    
    So, I'd like you to review current PR, and find out issues, and go together.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101831525
  
    We may be better to revert mk-transfer-fn to 0666c41387fc11c0422b26ab27ebc38c30fe26af as grouping by task can be handled (or doesn't need to be handled) from mk-transfer-tuples-handler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30225056
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    Yeah, maybe that part we do not need to change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-98857155
  
    @HeartSaVioR , I am sorry I have been swamped with another task, and I have not had a chance to review.
    
    I had a [branch](https://github.com/apache/storm/compare/master...d2r:storm-737-prevent-send-to-invalid-socket), but I had not taken an opportunity to really test it yet.
    
    I will try to get back to this within the week.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-103213278
  
    @d2r Thanks for reviewing!
    I'd like to get 6ef2f11 to pulled in, so I'm closing this PR unless we agree #521 is better than #557.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-98850849
  
    @nathanmarz @d2r You may want to take a look since it's based on your discussion, https://github.com/apache/storm/pull/349#issuecomment-87778672.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30201415
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -26,10 +26,10 @@
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    --- End diff --
    
    @d2r 
    Looking it again, it should stores tuples to buffer unless batch-end? is true.
    0666c41 stores tuples to ArrayList, and 861a92e just stores tuples to TransferDrainer itself.
    So if we changed add() to private or remove add(), we should introduce another buffer, which role may be duplicated to TransferDrainer.
    (It may not an issue when batch-end? is always true, but I don't know.)
    
    Which one do you think is better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30539591
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
       }
       
    -  public void send(HashMap<String, IConnection> connections) {
    -    for (String hostPort : bundles.keySet()) {
    -      IConnection connection = connections.get(hostPort);
    -      if (null != connection) { 
    -        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
    -        Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -        if (null != iter && iter.hasNext()) {
    -          connection.send(iter);
    +  public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
    +
    +    for (String hostPort : bundleMapByDestination.keySet()) {
    +      if (hostPort != null) {
    --- End diff --
    
    Can `hostPort` (the key) be `null` in the map returned by `groupBundleByDestination`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101480982
  
    I see 4 test errors using your branch.  They all appear to be test timeouts.
    
    * integration-test/test-basic-topology
    * messaging-test/
       * test-receiver-message-order
       * test-local-transport
    * netty-integration-test/test-integration
    
    I ran `mvn test` on Linux and OSX and got the same results.  Can you please check?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30513365
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    @d2r 
    Accessing task->node+port from these lines is root of this problem since it can be changed during preparing for transfer. So we should change current implementation to not accessing task->node+port.
    It should be grouped by destination (node and port) within read lock.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-98942013
  
    @d2r Thanks for comment, I can wait it. :)
    
    Btw, your branch doesn't follow Nathan's comment.
    ```
    In short, the code in the write-lock is fine – it's the code in the read lock that needs to be fixed. As part of that, the code looking up the node+port for a task needs to be moved back to this function and not happen before the tuple goes on the transfer queue.
    ```
    Root reason is from mk-transfer-fn. 
    It matches task and node+port but there's some latencies from adding Transfer queue to sending via Netty. Task to node+port can be changed during latency so we should delegate matching it to very close to sending, within read lock to get safely.
    
    So I'd like you to compare mine and yours when you come back. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30466962
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
    --- End diff --
    
    @d2r Please check my previous comment.
    https://github.com/apache/storm/pull/521#discussion_r30201415


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30750199
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
       }
       
    -  public void send(HashMap<String, IConnection> connections) {
    -    for (String hostPort : bundles.keySet()) {
    -      IConnection connection = connections.get(hostPort);
    -      if (null != connection) { 
    -        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
    -        Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -        if (null != iter && iter.hasNext()) {
    -          connection.send(iter);
    +  public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
    +
    +    for (String hostPort : bundleMapByDestination.keySet()) {
    +      if (hostPort != null) {
    +        IConnection connection = connections.get(hostPort);
    +        if (null != connection) {
    +          ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
    +          Iterator<TaskMessage> iter = getBundleIterator(bundle);
    +          if (null != iter && iter.hasNext()) {
    +            connection.send(iter);
    +          }
             }
           }
    -    } 
    +    }
       }
    -  
    +
    +  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
    +    for (Integer task : this.bundles.keySet()) {
    +      String hostPort = taskToNode.get(task);
    +      if (hostPort != null) {
    +        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
    +          addListRefToMap(bundleMap, hostPort, chunk);
    +        }
    +      }
    +    }
    +    return bundleMap;
    +  }
    +
    +  private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles,
    --- End diff --
    
    @d2r OK, it's better than shadowing fields. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30539714
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -129,12 +129,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    --- End diff --
    
    If we have nothing in the binding form, then we can remove the `let` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/storm/pull/521


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30511279
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    I think we both agree.
    
    I suggest we do not change L132-137, since it already groups by destination.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30197889
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    @d2r 
    Yes, TransferDrainer doesn't have to do it.
    Btw, it makes me think that we may don't need to play with TransferDrainer since its main role is grouping tuples by node+port.
    
    Could you confirm that I'm right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101648482
  
    > @d2r I can't reproduce test failures. I'll give it a try again.
    
    OK, please update your branch, and I will re-run the tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101841901
  
    I made another changeset which reverts whole things to 0666c41 (without ```try-serialize-local```) but leaves TransferDrainer as buffer and grouper (by host+port).
    https://github.com/HeartSaVioR/storm/commit/6ef2f11e9eade772c8ae67a6410d537871739938
    
    If we think latter is better to move on, I'll post a new PR based on 6ef2f11e9eade772c8ae67a6410d537871739938.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101382574
  
    @HeartSaVioR 
    
    Thanks for taking a look. Well, I was trying to follow his comments. :)
    
    I noticed though, that the line in my `let` binding
    ```Clojure
    valid-node+ports (vals task->node+port)
    ```
    maybe should have been
    ```Clojure
    valid-node+ports (vals @task->node+port)
    ```
    with the `@`.
    
    `task->node+port` is actually an atom, however, so if we dereference it within the read-lock there, we should have the accurate task assignment before we add anything to the queue to be sent.  I believe this addresses the concern you cited from @nathanmarz.  The other change was to encapsulate the separate TransferDrainer#add, since it does not need to be a separate public method.
    
    So, @HeartSaVioR, since we have both looked at my branch already, do you want to take my changes into your branch and continue the discussion from there?  It seems like a smaller set of changes to begin with, and maybe it would be easier going forward?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-103145088
  
    @HeartSaVioR,
    
    I see you are right.  There are two race conditions here:
    
    1. If we do not check assignments in the read-lock, then we could use an invalid connection. There could be an exception if we lose this race.
    
    2. If we group by destination node+port when we equeue tuples for sending, we lose the information needed to update the destination node+port when we dequeue for sending, since assignments `task->node+port` can change in the meantime.  If we lose this race, then tuples for which there is a new assignment may be dropped unnecessarily (if we fix 1.) or sent to the wrong worker.
    
    (Formerly, I did not realize 2. was also a regression with 861a92e, so I was looking for a smaller change in this PR.)
    
    > If you think latter is better to move on, I'll post a new PR based on 6ef2f11.
    
    OK, I will take a look at your other branch.
    
    >  IMO it is a critical path for transfer efficiency, so it would be better to have more reviewer for this patch.
    
    I agree. The more, the better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-106038982
  
    Tested with https://github.com/yahoo/storm-perf-test with the following arguments:
    ```
    --ack --bolt 4 --name test -l 1 -n 1 --workers 4 --spout 3 --testTimeSec 900
    -c topology.max.spout.pending=1092 --messageSize 10
    ```
    
    OK, I found both the data-rate and the latency are improved with this patch in my tests:
    
    master at 512d3def:
    Throughput in MB/s:
      max: 0.1519711812
      99:  0.1510783919
      90:  0.1276066783
      50:  0.1100190481
      min: 0.07258733114
    Complete Latency/10m: 265ms
    
    This PR 85c5096e merged to master at 512d3def:
    Throughput in MB/s:
      max: 0.1760864258
      99:  0.1707911174
      90:  0.1550458272
      50:  0.1419607798
      min: 0.1128451029
    Complete Latency/10m: 204ms
    
    I am also fine with the changes.
    
    +1
    
    Thank you for your patience, @HeartSaVioR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30197303
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    If we use `task` as the key, doesn't that mean we are no longer batching tuples that go to the same `node+port`?  I think we want to do this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30539645
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
       }
       
    -  public void send(HashMap<String, IConnection> connections) {
    -    for (String hostPort : bundles.keySet()) {
    -      IConnection connection = connections.get(hostPort);
    -      if (null != connection) { 
    -        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
    -        Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -        if (null != iter && iter.hasNext()) {
    -          connection.send(iter);
    +  public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
    +
    +    for (String hostPort : bundleMapByDestination.keySet()) {
    +      if (hostPort != null) {
    +        IConnection connection = connections.get(hostPort);
    +        if (null != connection) {
    +          ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort);
    +          Iterator<TaskMessage> iter = getBundleIterator(bundle);
    +          if (null != iter && iter.hasNext()) {
    +            connection.send(iter);
    +          }
             }
           }
    -    } 
    +    }
       }
    -  
    +
    +  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
    +    for (Integer task : this.bundles.keySet()) {
    +      String hostPort = taskToNode.get(task);
    +      if (hostPort != null) {
    +        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
    +          addListRefToMap(bundleMap, hostPort, chunk);
    +        }
    +      }
    +    }
    +    return bundleMap;
    +  }
    +
    +  private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles,
    --- End diff --
    
    nit: Can we rename `bundles` to something like `bundleMap`, since it is also the name of a member?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30197344
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -26,10 +26,10 @@
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    --- End diff --
    
    @d2r 
    Yes, we don't need this since we're adding tuples and sending it immediately. 
    I'll make it private and add parameter to send. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30466949
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    @d2r 
    Think about situation like 
    
    * tuples which consist of 
      * 3 tuples corresponding to task 1 (worker A) -- (A)
      * 5 tuples corresponding to task 2 (worker A) -- (B)
      * 2 tuples corresponding to task 3 (worker B) -- (C)
    
    We can ensure we send tuples to proper worker since we check task->node+port with read lock.
    But if we don't re-group messages by worker, we'll send tuples batched by task, not worker.
    It means, though we can send 10 tuples at a moment, we send batched tuples (A), (B) to worker A separately.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-106087952
  
    I'm +1. As I mentioned on the other pull request this patch actually improves performance while the alternate approach created a performance regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-103187844
  
    > I made another changeset which reverts whole things to 0666c41 (without try-serialize-local) but leaves TransferDrainer as buffer and grouper (by host+port).
    > HeartSaVioR@6ef2f11
    > 
    > If you think latter is better to move on, I'll post a new PR based on 6ef2f11.
    
    I do prefer the changes on your other branch above those in this pull request because the other branch is cleaner, but it is up to you.  Either way we will want to be careful.
    
    Thanks for looking at this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30196125
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -26,10 +26,10 @@
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    --- End diff --
    
    We should remove `add` or make it private, as per [this comment](https://github.com/apache/storm/pull/349#issuecomment-87767343).  There is no need to have separate `add` and `send` methods here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30521681
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    > I suggest we do not change L132-137, since it already groups by destination.
    
    I will follow up again, I think I see what you mean in your other comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101825293
  
    @d2r 
    Modified TransferDrainer to re-group messages by destination when send has called.
    I left TransferDrainer.add() cause it plays as buffer, but I'm ready to remove Transfer.add() when we think it's better to have other buffer.
    I've done upmerging, and ran unit tests with no issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30198050
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -139,12 +139,12 @@
                       (.add local pair) 
     
                       ;;Using java objects directly to avoid performance issues in java code
    -                  (let [node+port (get @task->node+port task)]
    -                    (when (not (.get remoteMap node+port))
    -                      (.put remoteMap node+port (ArrayList.)))
    -                    (let [remote (.get remoteMap node+port)]
    +                  (let []
    +                    (when (not (.get remoteMap task))
    +                      (.put remoteMap task (ArrayList.)))
    +                    (let [remote (.get remoteMap task)]
                           (.add remote (TaskMessage. task (.serialize serializer tuple)))
    -                     )))) 
    +                    ))))
    --- End diff --
    
    @d2r 
    Maybe I misunderstood your comment.
    Seems like I grouped tuples by task, not node+port. It should be grouped again from mk-transfer-tuples-handler.
    I'll fix it. Maybe TransferDrainer can help this, so please forget about last comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-102749917
  
    Nathan's comments for clarifying its real issue completely makes sense.
    But we should verify what codes we should change while applying.
    IMO it is a critical path for transfer efficiency, so it would be better to have more reviewer for this patch.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30463289
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
    --- End diff --
    
    This does not need to be public.  It is not necessary to separately add and send.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-101475237
  
    > @d2r
    > First of all, I'm sorry I did some mistakes about expression.
    
    No worries.
    
    > So, I'd like you to review current PR, and find out issues, and go together.
    
    OK, sounds good to me. Let me take a look.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-106087173
  
    @ptgoetz Could you take a look in order to complete current PR? Thanks in advance!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: [STORM-737] Check task->node+port with read lo...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/521#issuecomment-104045430
  
    @d2r @ptgoetz Applied @d2r's comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---