You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by tedxia <gi...@git.apache.org> on 2014/09/24 14:48:08 UTC

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

GitHub user tedxia opened a pull request:

    https://github.com/apache/storm/pull/268

    STORM-329 : buffer message in client and reconnect remote server async

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tedxia/incubator-storm master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/storm/pull/268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #268
    
----
commit 8c52a5b518021a6beff372acbeb66a963a1d4f74
Author: xiajun <xi...@xiaomi.com>
Date:   2014-09-24T12:39:18Z

    STORM-329 : buffer message in client and reconnect remote server async

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59676137
  
    ```
    When target worker is down, the data sending to other target worker should not be blocked.
    The approach we currently using is to drop messages when connection to target worker is not available.
    ```
    
    This solution may need further discussions:
    
    approach A(adopted in current patch):
    If we drop the message, the dropped message may takes up to 30 seconds to be replayed(depending on config topology.message.timeout.secs).
    At the same time it is safer for current worker (no OOM, especially for unacked topology), and messages dispatching to other workers(no blocking).
    
    approach B:
    If we do buffering in the netty client, the latency varies in two case:
      case1: target worker is alive, we are doing re-connecting, and the reconnection will eventually succeed. The latency includes the time to connect to current worker, and the time interval of flusher.
     case 2: target worker is not alive. but the source worker have not be aware of that. in this case, the latency will be same as approach A(30 seconds by default)
    
    approach C:
    @HeartSaVioR raised that it may be more reasonble to buffer the message outside of netty client. Better buffered in a map which can be retrived with task Id, so that we can still recover messages to target taskId, if the mapping from taskId to worker changes.
    
    For this approach, it will requires the messaging layer user(netty client user) know the status of connection(possible with new interface ConnectionWithStatus). And it need larger change in clojure. (For efficiency and performance, we want to group messages to same target host together).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73651894
  
    +1 on the update, the patch is well written. 
    
    I made a few comments under reivew panel of 
    https://github.com/miguno/storm/commit/8ebaaf8dbc63df3c2691e0cc3ac5102af7721ec3#diff-e1bd524877b15ccf409f846e3c95da13R203


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66836806
  
    @tedxia This pull request has merge conflicts, would you mind syncing up with the master branch?
    
    @nathanmarz There's been a considerable amount of discussion since your -1, could you reevaluate?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-62509705
  
    @tedxia 
    
    The thread pool size of clientScheduleService is decided by worker number (also >=1 and <= 10).
    For example, if there are 2 worker, the pool size is 1, if worker number is 4, then the pool size is 3.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r19519682
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -378,9 +392,15 @@
     
             _ (refresh-connections nil)
             _ (refresh-storm-active worker nil)
    +
    +        receive-thread-shutdown (launch-receive-thread worker)
    +
    +        ;; make sure all messaging connections are ready for sending data, the netty messaging
    +        ;; client will drop the messages if client.send(msg) is called before the connection
    +        ;; is established.
    +        _ (wait-messaging-connections-to-be-ready worker)
      
    --- End diff --
    
    The wait here is to solve a corner case.
    
    When the topology boots up. Previsous approach will start spout immediately, which will use the IConnection layer before the IConnection is available.
    
    The wait-messaging-connections-to-be-ready is to ensure that the connection layer is ready before starting spout.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-64768289
  
    @clockfly connect and send are all synchronized, this may also cause block. Think about this:
    
    1. Worker A send message to B and C, but B had been crashed, so we start reconnect in scheduler(this will not schedule until send finished), send data to C;
    2. Worker A start reconnect to B, this will use a lot of time;
    3. Worker A send message to B again, this will block until reconnect to B finish, this will also block send message to other worker; 
    
    
    
    ##### The biggest problem for this is we got sleep in connect, this will cause long time competition, I will try to fix this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by MaximeDiab <gi...@git.apache.org>.
Github user MaximeDiab commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r24071455
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -142,6 +147,15 @@ public void run() {
                     
                 }
             };
    +
    +        connector = new Runnable() {
    +            @Override
    +            public void run() {
    +                if (!closing) {
    +                    connect();
    +                }
    +            }
    +        };
             
             long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
    --- End diff --
    
    Math.min(arg1, arg2) method returns the smaller of the two arguments.
    in this case, the maximum delay corresponds to the minimum sent by ".min" method. The comment is perhaps not accurate, but I think this is not really a problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73889052
  
    The bolt-of-death topology is now available:
    https://github.com/verisign/storm-bolt-of-death


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73667341
  
    Many thanks for your review, Sean.  I addressed your comments, see the new commits in https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge.
    
    One final question:  How should we address the following TODO in `send(Iterator<TaskMessage> msgs)`?  I'd appreciate additional eyeballs on this. :-)  It's a scenario that we may or may not have overlooked in the original code.  Note how we are NOT checking for a `WRITABLE` channel while flushing (full) message batches (cf. the `while` loop), but we do check for a `WRITABLE` channel when handling any left-over messages (cf. after the `while` loop).
    
    ```java
        /**
         * Enqueue task messages to be sent to the remote destination (cf. `host` and `port`).
         */
        @Override
        public synchronized void send(Iterator<TaskMessage> msgs) {
    
            // ...some code removed to shorten this code snippet...
    
            Channel channel = channelRef.get();
            if (!connectionEstablished(channel)) {
                // Closing the channel and reconnecting should be done before handling the messages.
                closeChannelAndReconnect(channel);
                handleMessagesWhenConnectionIsUnavailable(msgs);
                return;
            }
    
            // Collect messages into batches (to optimize network throughput), then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    
            // Handle any remaining messages in case the "last" batch was not full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                    // We can write to the channel, so we flush the remaining messages immediately to minimize latency.
                    pauseBackgroundFlushing();
                    MessageBatch toBeFlushed = messageBatch;
                    messageBatch = null;
                    flushMessages(channel, toBeFlushed);
                }
                else {
                    // We cannot write to the channel, which means Netty's internal write buffer is full.
                    // In this case, we buffer the remaining messages and wait for the next messages to arrive.
                    //
                    // Background:
                    // Netty 3.x maintains an internal write buffer with a high water mark for each channel (default: 64K).
                    // This represents the amount of data waiting to be flushed to operating system buffers.  If the
                    // outstanding data exceeds this value then the channel is set to non-writable.  When this happens, a
                    // INTEREST_CHANGED channel event is triggered.  Netty sets the channel to writable again once the data
                    // has been flushed to the system buffers.
                    //
                    // See http://stackoverflow.com/questions/14049260
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
                }
            }
    ```
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73276378
  
    Thanks for the follow-up, @tedxia.
    
    It looks as if I was able to report progress.  My conversation with @clockfly earlier today was very helpful in this regard -- thanks, Sean!
    
    I have refactored/rewritten our Netty Client code (`Client.java`).  The first experiments using this new code are positive:  The upstream bolts do not die anymore if the downstream bolts die (cf. my test scenario / test topology above), and as soon as the downstream bolts return back to life they will receive and process new input tuples.
    
    I'll have to do some follow-up work on the code -- there is at least one piece of it that I do not like because it's more of a hack -- as well as perform additional tests next week, of course.  By then I should also be able to share the test topology I was referring to above so that you guys can reproduce the same test/failure setup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59363502
  
    @tedxia 
    
    Based on @clockfly 's classification...
    1 is related to STORM-404, and 2 is related to STORM-510.
    3 is related to current issue, but pull request doesn't provide such options.
    
    Actually this PR doesn't resolve STORM-329, so it could not be accepted.
    
    Btw, I recommend you to resolve issues one by one. (small changeset)
    It would be better for reviewers to review changeset easily.
    I would like to contribute your effort. Please feel free to mention me when there's something I can help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-72653588
  
    @tedxia (or @clockfly): Have you experienced similar Storm behavior as [I described above](https://github.com/apache/storm/pull/268#issuecomment-72652704) in your patched production cluster?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-63443152
  
    It seems common in storm UT to have random failures.
    
    We may need to clean Storm UT to make it faster and more robust.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73924933
  
    The new pull request is available at https://github.com/apache/storm/pull/428.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-60863620
  
    Hi Ted,
    
    ```
    This will still cause another worker crash.
    Now there are two things happen in parallel, first nimbus inform worker A that worker B is not alive by zk timeout, second worker A connect to worker B by netty. When worker B died for some reason, we can't guarantee which happen first, if the connection from worker A to worker B broken first, then client's status will became closing, but right now nimbus has't inform worker A that worker has died, then worker A will still send message to worker B, the Exception will throw and what STORM-404 describe will happen.
    ```
    
    Suppose worker A send msg to worker B. 
    If B dies, At A, it will have multiple reconnection try. A will get notified that B is dead, and will set closing flag of Client(to B), which will abort the reconnection process, no RuntimeException will be thrown.
    
    If there is a network partitioning issue, suppose worker A is isolated from the rest of the cluster. A will not get notified that B is not alive. Then A will continue to retry reconnection to B for multiple times, and throw RuntimeException, and exit eventually. In this case, A cannot recover, throw is best option.
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61393801
  
    Summary of the test:
    ============
    UT pass
    STORM-404 pass
    STORM-510 pass
    Performacne regression pass
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61286844
  
    About performance test:
    ===========================
    I tested the performance of new patch.
    
    It has no sigificant difference with storm-0.92. 
    
    About STORM-404 chained crash issue(one worker cause another worker to crash)
    ========================
    With this patch, the reconnection is successfully aborted. And new connection is established.
    
    ```
    2014-10-31T23:00:11.738+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [30]
    2014-10-31T23:00:12.738+0800 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-IDHV22-04/192.168.1.54:6703
    2014-10-31T23:00:12.739+0800 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-IDHV22-04/192.168.1.54:6703..., timeout: 600000ms, pendings: 0
    2014-10-31T23:00:32.754+0800 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (30). Pinning to 29
    2014-10-31T23:00:32.754+0800 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [30]
    2014-10-31T23:00:32.754+0800 b.s.m.n.Client [INFO] New Netty Client, connect to IDHV22-01, 6702, config: , buffer_size: 5242880
    2014-10-31T23:00:32.754+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-01/192.168.1.51:6702... [0]
    2014-10-31T23:00:32.755+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-01/192.168.1.51:6702, [id: 0x4f7eb44b, /192.168.1.51:56592 => IDHV22-01/192.168.1.51:6702]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r18688234
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/IConnection.java ---
    @@ -40,7 +41,7 @@
          * @param msgs
          */
     
    -    public void send(Iterator<TaskMessage> msgs);
    +    public void send(ArrayList<TaskMessage> msgs);
    --- End diff --
    
    Actually, this don't need change, I will change it back. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-60866235
  
    @clockfly , when worker A not get notified that worker B is not alive, the connection from A to B should abort, but not throw RuntimeException, because will cause the worker A exit even if the connection between worker A and other worker are normal. There are many factors that cause nimbus not inform worker A that worker B died, such as heavy load of zookeeper, failure of nimbus and so on. For those reason even if network partition, I still don't think it is a good idea for worker A exit, fault tolerance is too high a price to pay.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59317470
  
    Hi Ted,
    
    You are trying to achive three things:
    1. When a worker is down, we don't want the upstream worker to fail in a chaining manner.
    2. When some connection is unavailable, we don't want other connection to wait for this connection.
    3. When the message come too fast, we need to strategy to handle it. Whether drop it, or cache it. Seems your timecachemap is used for this purpose.
    
    We better split these to small patches, and solve 1 and 2 in this patch. I will work with you and contribute on this pull request.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59366452
  
    thanks to @clockfly and @HeartSaVioR , I will take all your suggestion, and add some patch later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-58614296
  
    In this patch, Client have TimeCacheMap for cache new messages that has't been send out. Worker only need add message to corresponding Client, and Client will decide when to send the new arrival message. At most time Client will send new arrival message as soon as possible, but when the connect had been lost or there are older message. So I removed "send" in worker.clj.
    
    For Client, there is a State machine stand for Client's current state, such as NEW, CONNECTED, DISCONNECTED and CLOSED. At first Client' state is NEW, then we will connect remote worker by start a timer that run connect logic (we call it connector) immediately. If connect success, Client't state will become CONNECTED, connector will start a  periodic timer for send message to remote worker (we call it flusher). If connect failed, it will retry a few times  until success or Client's state become CLOSED. When Client's state become CLOSED, Client will be destoried. Flusher will send cached message periodically, and when flush failed Client's state will become DISCONNECTED and start connector immediately. For reduce message transfer delay, when some message arrival, we will start flusher immediately, and of course flusher only work when Client's state are CONNECTED.
    
    That it the main change for this patch. I am very sorry for all of this. As a new man for opensource, i will try my best to do better.
    Thanks all.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59317503
  
    For item 3, we can move that to a sub-task.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-81735980
  
    Yes, this pull request can be closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-72860304
  
    @miguno I test this on my cluster just now.
    The topology is more simple "spout ----> bolt0 ---->bolt1", and each component only have one executor;
    I did't met the situation what you said, first I kill bolt1, and after 2 second bolt0 know bolt1 died and close the connection. And then after 31s, bolt0 start connect to another worker that contain bolt1.
    After bolt0 connect to new bolt1, new bolt1 receive immediately, I see this through ui acked number; 
    
    1 First I kill bolt1 at 21:48:07;
    2 bolt0 know bolt1 died at 21:48:08
    ```
    2015-02-04 21:48:08 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813
    java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcher.read0(Native Method) ~[na:1.6.0_37]
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21) ~[na:1.6.0_37]
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198) ~[na:1.6.0_37]
            at sun.nio.ch.IOUtil.read(IOUtil.java:166) ~[na:1.6.0_37]
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:245) ~[na:1.6.0_37]
            at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:322) ~[netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) ~[netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) ~[netty-3.2.2.Final.jar:na]
            at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na]
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    2015-02-04 21:48:08 b.s.m.n.Client [INFO] failed to send requests to lg-hadoop-tst-st04.bj/10.2.201.70:42813:
    java.nio.channels.ClosedChannelException: null
            at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:629) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:605) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:356) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na]
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    ```
    Then bolt0 start reconnect to bolt1, and stop send message to bolt1,
    ```
    2015-02-04 21:48:08 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 1 messages...
    2015-02-04 21:48:08 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813... [0]
    2015-02-04 21:48:08 b.s.m.n.Client [INFO] failed to send requests to lg-hadoop-tst-st04.bj/10.2.201.70:42813:
    java.nio.channels.ClosedChannelException: null
            at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:629) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.close(NioWorker.java:605) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:356) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.2.2.Final.jar:na]
            at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46) [netty-3.2.2.Final.jar:na]
            at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    ```
    After reconnect 30 times, bolt0 close this connection
    ```
    2015-02-04 21:48:45 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813... [29]
    2015-02-04 21:48:48 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813... [30]
    2015-02-04 21:48:49 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-lg-hadoop-tst-st04.bj/10.2.201.70:42813
    ```
    3 bolt start connect to new bolt1 at 21:48:49
    ```
    2015-02-04 21:48:49 b.s.m.n.Client [INFO] New Netty Client, connect to lg-hadoop-tst-st01.bj, 42811, config: , buffer_size: 5242880
    2015-02-04 21:48:49 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st01.bj/10.2.201.65:42811... [0]
    2015-02-04 21:48:49 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-lg-hadoop-tst-st01.bj/10.2.201.65:42
    811, [id: 0x65616864, /10.2.201.68:58243 => lg-hadoop-tst-st01.bj/10.2.201.65:42811]
    ```
    
    @miguno If you give me your storm code after merged, I will very glad to test again.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73905601
  
    Regarding the TODO (how to handle messages if the channel is not writable):
    
    I'd prefer to keep the current code as is in this regard, i.e. keep the TODO in place.  We should first finish this pull request, and then address the TODO in a separate Storm ticket.
    
    What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73527901
  
    I have improved the patch in this pull request, which particularly meant modifications to
    
    The updated patch is availabe at:
    * https://github.com/miguno/storm/commits/0.10.0-SNAPSHOT-STORM-392-miguno-merge
    * The relevant commit is https://github.com/miguno/storm/commit/8ebaaf8dbc63df3c2691e0cc3ac5102af7721ec3.
    
    Code lineage: This improved patch includes the latest Storm 0.10.0-SNAPSHOT code as of today, Feb 09.  I used the patch in this pull request (#268) as the basis, added my changes, and then pulled in the latest upstream Storm changes for 0.10.0-SNAPSHOT via `git merge upstream/master`.
    
    High-level changes:
    
    - The great majority was refactoring the [Client.java](https://github.com/apache/storm/blob/master/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java) in the Netty messaging backend.  For instance, I removed unused imports, obsolete code, clarified variables/methods, consolidated duplicate code, added documentation, improved logging (which also helped me a lot while I was bug-hunting), etc.
    - I also introduced a few functional changes to the code in order to fix the issue at hand.  Notably, the new code can properly detect disconnects / channel loss and trigger reconnect events.  This also meant that the new code is now more consistently verifying whether a channel from the client to the server is truly `CONNECTED`. Also, we're now using a `ListenableFuture` instead of a `Runnable` to handle the connection/reconnection logic, which leads to cleaner and more readable code IMHO.
    - I updated [netty_unit_test.clj](https://github.com/apache/storm/blob/master/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj) so that all tests also wait until the Netty server and client instances are ready (i.e. all of their connections are ready).  This change was needed because the original+improved STORM-329 patches modify Storm so that it will ensure the readiness of server and client instances during the initial startup of a topology, but the Netty unit tests are test-driving servers and clients directly, so the tests couldn't rely on Storm's help.
    
    Testing:
    - The improved patch passes the full Storm's test suite.
    - I have successfully tested a patched version of Storm 0.10.0-SNAPSHOT (code as of today) in a Storm cluster via the aforementioned storm-bolt-of-death topology.
        - With the original patch, restarted bolt instances would not receive new incoming data.  With the improved patch the bolt instances start processing new data immediately (as expected).
    
    Next steps:
    - I would appreciate any feedback of other developers on this improved patch.  I'd particularly appreciate any performance tests in addition to functional tests.  (I do not expect a performance degradation but of course I might be wrong.)
    - The storm-bolt-of-death topology to trigger the cascading failure should be available by tomorrow for your testing convenience.
    
    Many thanks again to @tedxia and @clockfly for the original patch, which covered a lot (most?) of work already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59529389
  
    I have a question (maybe comment) about your PR.
    (Since I don't know Storm deeply, so it could be wrong. Please correct me if I'm wrong!)
    
    When we enqueue tuples to Client, queued tuples seems to be discarded when one worker is down and nimbus reassigns task to other worker, and finally worker changes task-socket relation.
    But if we enqueue tuples to Drainer, queued tuples may could be sent to new worker when task - socket cache is changed to new.
    If I'm right, it would be better to place flusher into TransferDrainer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-60871259
  
    ```
     If worker doesn't get update of B from zookeeper after 300 reconnection retries, should we exit the worker or let worker continues to work?
    ```
    
    Current approach is:
     -  if worker A get update of B from zk, it will abort the reconnection. and worker A will still stay alive and working.
     - if A doesn't get update from zk that B is dead, after timeout of 300 reconnection retries. then A will exit. In my opinion, there is no way to recover but exit. because a) A belive it must have live connection to B because application tell it that b) A cannot setup connection to B after exhausting every effort.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by harshach <gi...@git.apache.org>.
Github user harshach commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-58608071
  
    @tedxia is it possible to add some specific unit tests for these changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-58609408
  
    -1
    
    You need to explain these changes more, especially the changes to worker.clj


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-65763659
  
    Thanks Ted,
    
    Yes,  we need a fine-grained lock at ”synchronrized connect() “. 
    
    I see you changed it to use schedule Runnable, it may cause deadlock, as schedule is a blocking operation. "connect() wait for threadpool.schedule() to release slots, threadpool.schedule() wait for connect() to exit"
    
    Looking forward to your update on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-63424599
  
    @ptgoetz I have merge storm/master to this patch and all tests pass except "storm-core/test/clj/backtype/storm/multilang_test.clj", test results are(after I removed multilang_test.clj):
    ```
    [INFO] Reactor Summary:
    [INFO] 
    [INFO] Storm ............................................. SUCCESS [1.340s]
    [INFO] maven-shade-clojure-transformer ................... SUCCESS [1.712s]
    [INFO] Storm Core ........................................ SUCCESS [6:04.103s]
    [INFO] storm-starter ..................................... SUCCESS [7.010s]
    [INFO] storm-kafka ....................................... SUCCESS [1:03.183s]
    [INFO] storm-hdfs ........................................ SUCCESS [2.056s]
    [INFO] storm-hbase ....................................... SUCCESS [2.186s]
    [INFO] Storm Binary Distribution ......................... SUCCESS [0.185s]
    [INFO] Storm Source Distribution ......................... SUCCESS [0.136s]
    ```
    
    When I run multilang_test.clj, I got exception like this:
    ```
    java.lang.Exception: Shell Process Exception: Exception in bolt: "\xE4" on US-ASCII - /usr/lib/ruby/1.9.1/json/common.rb:148:in `encode'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `initialize'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `new'\n/usr/lib/ruby/1.9.1/json/common.rb:148:in `parse'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:39:in `read_message'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:57:in `read_command'\n/tmp/81b49de0-4ee0-493a-afe0-6286e393fb14/supervisor/stormdist/test-1-1416288370/resources/storm.rb:190:in `run'\ntester_bolt.rb:37:in `<main>'
            at backtype.storm.task.ShellBolt.handleError(ShellBolt.java:188) [classes/:na]
            at backtype.storm.task.ShellBolt.access$1100(ShellBolt.java:69) [classes/:na]
            at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:331) [classes/:na]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    124526 [Thread-1209] ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died.
    java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read.
    Serializer Exception:
    
    
            at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na]
            at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    124527 [Thread-1209] ERROR backtype.storm.daemon.executor - 
    java.lang.RuntimeException: backtype.storm.multilang.NoOutputException: Pipe to subprocess seems to be broken! No output read.
    Serializer Exception:
    
    
            at backtype.storm.utils.ShellProcess.readShellMsg(ShellProcess.java:101) ~[classes/:na]
            at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:318) ~[classes/:na]
            at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    ```
     When I run multilang_test.clj at storm/master, I got the same exception, I think this may be my personal environment problem, can you merge this to 0.9.3 branch and run tests again, thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61287291
  
    During this test, I found there was message loss, but it was not caused by this new patch.
    
    I traced back, seems the fault is introduced with changes between 0.9.2 and 0.9.3-rc1.
    I am still trying to find the root cause for this error.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-67047625
  
    @nathanmarz ,
    
    I'd like to explain why I need to change worker.clj.
    
    This was also motivated by a legacy TODO in in zmq.clj. 
    https://github.com/nathanmarz/storm/blob/0.8.2/src/clj/backtype/storm/messaging/zmq.clj#L43
    ```
      (send [this task message]
    ...
        (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
    ```
    As we can see, zeromq transport will send message in non-blocking way. 
    
    If I understand this TODO correctly, it wants,
    a) When target worker is not booted yet, the source worker should not send message to target. Otherwise, as there is no backpressure, there will be message loss during the bootup phase. If it is un unacked topology, the message loss is permanent; if it is an acked topology, we will need to do unnecessary replay. 
    b) When target worker disappears in the middle(crash?), the source worker should drop the messages directly.
    
    The problem is that: transport layer don't know by itself whether the target worker is "booting up" or "crashed in the running phase", so it cannot smartly choose between "back pressure" or "drop".
    
    If the transport simplifiy choose "block", it is good for "booting up" phase, but bad for "running phase". If one connection is down, it may block messages sent to other connections.
    If the transport simplify choose "drop", it is good for "running phase", but bad for "booting up" phase. If the target worker is booted 30 seconds later, all message between this 30 seconds will be dropped. 
    
    The changes in "worker.clj" is targeted to solve this problem.
    Worker knows when the target worker connections are ready.
    In the bootup phase, worker.clj will wait target worker connection is ready, then it will activate the source worker tasks.
    In the “runtime phase", the transport will simply drop the messages if target worker crashed in the middle.
    
    There will be several benefits:
    1. During cluster bootup, for unacked topology, there will be no strange message loss.
    2. During cluster bootup, for acked topology, it can take less time to reach the normal throughput, as there is no message loss, timeout, and replay.
    3. For transport layer, the design is simplified. We can just drop the messages if target worker is not available. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59674932
  
    To retain the comments, I modified ted's branch. 
    
    The update will tries to slove these three problems:
    1. When target worker is down, the source worker should be aware of this and should not crash.
    The approach we use here is as follows:
       a. when target worker is down
       b. the source worker will begin reconnect (It can retry upto "storm.messaging.netty.max_retries" times)
      c. source worker know that target worker is down(by watching zookeeper nodes)
      d. source worker call target client.close(), client.close() now will set the flag closing as true asyncly, and this will break the reconnection at step b. 
      e. reconnection aborted. No exception thrown.
    
    2. When target worker is down, the data sending to other target worker should not be blocked.
    The approach we currently using is to drop messages when connection to target worker is not available.
    
    3. There is a side effect of solution in 2. During the course of topology setup, will we drop the messages if some target worker connection is establishing?
    For this problem, we will now wait the connection to be ready, before bringing up spout/bolts during worker startup. So, during the worker start, the spout/bolts are activated till the connections to all target worker is established.
    
    
      
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-63430061
  
    ```
    I tested this by applying it to the 0.9.3 branch and found problems with the unit tests (never-ending cycle of zookeeper reconnects, tests never complete).
    ```
    @ptgoetz Can you figure out which test cause this, I will check it again.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61404264
  
    @clockfly Sorry I'm bothering you, but how did you check lost tuples from storm-perf-test?
    Lost tuples should be taken care of seriously, so I think we should separate this from current issue, and rollback STORM-350 immediately before releasing 0.9.3-rc2 if it's always reproducible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-64870554
  
    @clockfly Will you have a look at this and test the newest patch at your environment, thank a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73910187
  
    So, I  don't think your TODO comment is a issue, it is actually designed like this, how do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-72652704
  
    Some additional feedback regarding Storm's behavior in the face of failures, with and without this patch.  This summary is slightly simplified to make it a shorter read.
    
    ### Without this patch
    
    * Tested Storm versions: 0.9.2, 0.9.3, 0.10.0-SNAPSHOT
    * Configuration: default Storm settings (cf. `conf/defaults.yaml`)
    
    Here, we can confirm the cascading failure previously described in this pull request and the JIRA ticket.
    
    Consider a simple topology such as:
    
    ```
                    +-----> bolt1 -----> bolt2
                    |
         spout -----+
                    |
                    +-----> bolt3
    ```
    
    * If the instances of `bolt2` die (say, because of a runtime exception), then `bolt1` instances will enter a reconnect-until-success-or-die loop.
        * If Storm decides to place the restarted `bolt2` instances on different workers (read: machine+port pairs), then `bolt1` will eventually die.
        * If Storm places the restarted `bolt2` instances on the same workers, then the `bolt1` instances will not die because one of their reconnection attempts will succeed, and normal operation will resume.
    * If `bolt1` died, too, then we enter the same reconnect-until-success-or-die loop in the `spout` instances.  Hence the cascading nature of the failure.
    
    On top of that, we also noticed the following, later phases of this cascading failure to occur in larger clusters, where each phase is less likely to happen than the previous one:
    
    1. Other spouts/bolts of the same topology -- "friends of friends" and so on -- may enter such loops.  In the example above, `bolt3` may start to die, too.
    2. Eventually the full topology may become disfunctional, a zombie: not dead but not alive either.
    3. Other topologies in the cluster may then become zombies, too.
    4. The full Storm cluster may enter a zombie state.  This state even turn out to be unrecoverable without a full cluster restart.
    
    > Funny anecdote: Because of a race condition scenario you may even observe that Storm spouts/bolts will begin to talk to the wrong peers, e.g. `spout` will talk directly to `bolt2`, even though this violates the wiring of our topology.
    
    ### With this patch
    
    * Tested Storm versions: We primarily tested a patched 0.10.0-SNAPSHOT version but also tested a patched 0.9.3 briefly.
    * Configuration: default Storm settings (cf. `conf/defaults.yaml`)
    
    Here, the behavior is different.  We didn't observe cascading failures anymore at the expense of "silent data loss" (see below).
    
    Again, consider this example topology:
    
    ```
                    +-----> bolt1 -----> bolt2
                    |
         spout -----+
                    |
                    +-----> bolt3
    ```
    
    With the patch, when the instances of `bolt2` die then the instances of `bolt2` will continue to run; i.e. they will not enter a reconnect-until-success-or-die loop anymore (which, particularly the not-dying part, was the purpose of the patch).
    
    **bolt2 behavior**
    
    We wrote a special-purpose "storm-bolt-of-death" topology that would consistently throw runtime exceptions in `bolt2` (aka the bolt of death) whenever it receives an input tuple.  The following example shows the timeline of `bolt2` crashing intentionally.  We observed that once the `bolt2` instances were restarted -- and Storm would typically restart the instances on the same workers (read: machine+port combinations) -- then they would not receive any new input tuples even though their upstream peer `bolt2` was up and running and constantly emitting output tuples.
    
    Summary of the `bolt2` log snippet below:
    
    * This `bolt2` instance dies at `12:49:21`, followed by an immediate restart (here: on the same machine+port).
    * The `bolt2` instance is up and running at `12:49:32`, but it would not process any new input tuple until `52 mins` later.
        * In our testing we found that the restarted `bolt2` instances took a consistent `52 mins` (!) to receive their first, "new" input tuple from `bolt1`.
    
    ```
    # New input tuple => let's crash!   Now the shutdown procedure begins.
    
    2015-02-03 12:49:21 c.v.s.t.s.b.BoltOfDeath [ERROR] Intentionally throwing this exception to trigger bolt failures
    2015-02-03 12:49:21 b.s.util [ERROR] Async loop died!
    java.lang.RuntimeException: java.lang.RuntimeException: Intentionally throwing this exception to trigger bolt failures
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$fn__6773$fn__6786$fn__6837.invoke(executor.clj:798) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    Caused by: java.lang.RuntimeException: Intentionally throwing this exception to trigger bolt failures
    	at com.verisign.storm.tools.sbod.bolts.BoltOfDeath.execute(BoltOfDeath.scala:77) ~[stormjar.jar:0.1.0-SNAPSHOT]
    	at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$fn__6773$tuple_action_fn__6775.invoke(executor.clj:660) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$mk_task_receiver$fn__6696.invoke(executor.clj:416) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	... 6 common frames omitted
    2015-02-03 12:49:21 b.s.d.executor [ERROR]
    java.lang.RuntimeException: java.lang.RuntimeException: Intentionally throwing this exception to trigger bolt failures
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$fn__6773$fn__6786$fn__6837.invoke(executor.clj:798) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    Caused by: java.lang.RuntimeException: Intentionally throwing this exception to trigger bolt failures
    	at com.verisign.storm.tools.sbod.bolts.BoltOfDeath.execute(BoltOfDeath.scala:77) ~[stormjar.jar:0.1.0-SNAPSHOT]
    	at backtype.storm.topology.BasicBoltExecutor.execute(BasicBoltExecutor.java:50) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$fn__6773$tuple_action_fn__6775.invoke(executor.clj:660) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$mk_task_receiver$fn__6696.invoke(executor.clj:416) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	... 6 common frames omitted
    2015-02-03 12:49:21 b.s.util [ERROR] Halting process: ("Worker died")
    java.lang.RuntimeException: ("Worker died")
    	at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:329) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.6.0.jar:na]
    	at backtype.storm.daemon.worker$fn__7196$fn__7197.invoke(worker.clj:536) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.executor$mk_executor_data$fn__6606$fn__6607.invoke(executor.clj:246) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:482) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:49:21 b.s.d.worker [INFO] Shutting down worker bolt-of-death-topology-1-1422964754 783bcc5b-b571-4c7e-94f4-72ff49edc35e 6702
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-supervisor2/10.0.0.102:6702
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-supervisor2/10.0.0.102:6702..., timeout: 600000ms, pendings: 0
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-supervisor1/10.0.0.101:6702
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-supervisor1/10.0.0.101:6702..., timeout: 600000ms, pendings: 0
    2015-02-03 12:49:21 b.s.d.worker [INFO] Shutting down receive thread
    2015-02-03 12:49:21 o.a.s.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (300). Pinning to 29
    2015-02-03 12:49:21 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [1000] the maxRetries [300]
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] New Netty Client, connect to supervisor3, 6702, config: , buffer_size: 5242880
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-supervisor3/127.0.1.1:6702... [0]
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-supervisor3/127.0.1.1:6702, [id: 0x8415ae96, /127.0.1.1:48837 => supervisor3/127.0.1.1:6702]
    2015-02-03 12:49:21 b.s.m.loader [INFO] Shutting down receiving-thread: [bolt-of-death-topology-1-1422964754, 6702]
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-supervisor3/127.0.1.1:6702
    2015-02-03 12:49:21 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-supervisor3/127.0.1.1:6702..., timeout: 600000ms, pendings: 0
    2015-02-03 12:49:21 b.s.m.loader [INFO] Waiting for receiving-thread:[bolt-of-death-topology-1-1422964754, 6702] to die
    2015-02-03 12:49:21 b.s.m.loader [INFO] Shutdown receiving-thread: [bolt-of-death-topology-1-1422964754, 6702]
    2015-02-03 12:49:21 b.s.d.worker [INFO] Shut down receive thread
    2015-02-03 12:49:21 b.s.d.worker [INFO] Terminating messaging context
    2015-02-03 12:49:21 b.s.d.worker [INFO] Shutting down executors
    2015-02-03 12:49:21 b.s.d.executor [INFO] Shutting down executor bolt-of-death-A2:[3 3]
    2015-02-03 12:49:21 b.s.util [INFO] Async loop interrupted!
    
    # Now the restart begins, which happened to be on the same machine+port.
    
    2015-02-03 12:49:28 o.a.s.z.ZooKeeper [INFO] Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
    2015-02-03 12:49:28 o.a.s.z.ZooKeeper [INFO] Client environment:host.name=supervisor3
    [...]
    2015-02-03 12:49:32 b.s.d.executor [INFO] Prepared bolt bolt-of-death-A2:(3)
    
    # Now the bolt instance would stay idle for the next 52 mins.
    # Only metrics related log messages were reported.
    
    2015-02-03 12:50:31 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor2/10.0.0.102:6702
    2015-02-03 12:50:31 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor1/10.0.0.101:6702
    2015-02-03 12:50:31 b.s.m.n.Server [INFO] Getting metrics for server on 6702
    2015-02-03 12:51:31 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor2/10.0.0.102:6702
    2015-02-03 12:51:31 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor1/10.0.0.101:6702
    2015-02-03 12:51:31 b.s.m.n.Server [INFO] Getting metrics for server on 6702
    [...]
    ```
    
      Until then they were idle, with logs showing:
    
    ```
    # Restart-related messages end with the "I am prepared!" log line below.
    2015-02-03 11:58:52 b.s.d.executor [INFO] Prepared bolt bolt2:(3)
    
    # Then, for the next 52 minutes, only log messages relating to metrics were reported.
    2015-02-03 11:59:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor2/10.0.0.102:6702
    2015-02-03 11:59:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor1/10.0.0.101:6702
    2015-02-03 11:59:52 b.s.m.n.Server [INFO] Getting metrics for server on 6702
    2015-02-03 12:00:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor2/10.0.0.102:6702
    2015-02-03 12:00:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor1/10.0.0.101:6702
    2015-02-03 12:00:52 b.s.m.n.Server [INFO] Getting metrics for server on 6702
    2015-02-03 12:01:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor2/10.0.0.102:6702
    2015-02-03 12:01:52 b.s.m.n.Client [INFO] Getting metrics for connection to supervisor1/10.0.0.101:6702
    2015-02-03 12:01:52 b.s.m.n.Server [INFO] Getting metrics for server on 6702
    
    # 52 minutes after the bolt restart it finally started to process data again.
    2015-02-03 12:49:21 ...
    ```
    
    **bolt1 behavior**
    
    During this time the upstream peer `bolt1` happily reported an increasing number of emitted tuples, and there were no errors in the UI or in the logs.  Here is an example log snippet of `bolt1` at the time when `bolt2` died (`ForwarderBolt` is `bolt1`).
    
    * `bolt1` complains about a failed connection to `bolt2` at `12:52:24`, which is about `3 mins` after the `bolt2` instance died `12:49:21`.
    * `bolt1` subsequently reports it re-established a connection to `bolt2` at `12:52:24` (the log timestamp granularity is 1 second).
        * `bolt1` reports 9 new output tuples but - if my understanding of the new patch is correct - this happens asynchronously now.
    * `bolt1` complains about another failed connection to `bolt2` at `12:52:25` (and another connection failure to a second instance of `bolt2` at `12:52:26`).
    * `bolt1` would then report new output tuples, but those would not reach the downstream `bolt1` instances until 52 minutes later.
    
    ```
    2015-02-03 12:52:24 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-supervisor3/10.0.0.103:6702
    java.nio.channels.ClosedChannelException: null
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:725) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:704) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:671) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.AbstractChannel.write(AbstractChannel.java:248) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.flushRequest(Client.java:398) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.send(Client.java:279) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086$fn__7087.invoke(worker.clj:351) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086.invoke(worker.clj:349) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_loop_STAR_$fn__884.invoke(disruptor.clj:94) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:24 b.s.m.n.Client [INFO] failed to send requests to supervisor3/10.0.0.103:6702:
    java.nio.channels.ClosedChannelException: null
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:725) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:704) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:671) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.AbstractChannel.write(AbstractChannel.java:248) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.flushRequest(Client.java:398) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.send(Client.java:279) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086$fn__7087.invoke(worker.clj:351) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086.invoke(worker.clj:349) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_loop_STAR_$fn__884.invoke(disruptor.clj:94) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:24 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-supervisor3/10.0.0.103:6702... [0]
    2015-02-03 12:52:24 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-supervisor3/10.0.0.103:6702, [id: 0xa58c119c, /10.0.0.102:44392 => supervisor3/10.0.0.103:6702]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [bertels]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [mike]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [bertels]
    2015-02-03 12:52:24 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [nathan]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [nathan]
    2015-02-03 12:52:25 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-supervisor3/10.0.0.103:6702
    java.io.IOException: Connection reset by peer
    	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.7.0_75]
    	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.7.0_75]
    	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.7.0_75]
    	at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.7.0_75]
    	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[na:1.7.0_75]
    	at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [jackson]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [nathan]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [jackson]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [mike]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:25 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [bertels]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [mike]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [mike]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [jackson]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [bertels]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [mike]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [nathan]
    2015-02-03 12:52:26 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-supervisor4/10.0.0.104:6702
    java.nio.channels.ClosedChannelException: null
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:725) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:704) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:671) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.AbstractChannel.write(AbstractChannel.java:248) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.flushRequest(Client.java:398) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.send(Client.java:279) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086$fn__7087.invoke(worker.clj:351) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086.invoke(worker.clj:349) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_loop_STAR_$fn__884.invoke(disruptor.clj:94) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) ~[clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:26 b.s.m.n.Client [INFO] failed to send requests to supervisor4/10.0.0.104:6702:
    java.nio.channels.ClosedChannelException: null
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.cleanUpWriteBuffer(AbstractNioWorker.java:433) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.writeFromUserCode(AbstractNioWorker.java:128) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioClientSocketPipelineSink.eventSunk(NioClientSocketPipelineSink.java:84) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendDownstream(DefaultChannelPipeline.java:779) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:725) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:59) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:591) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.DefaultChannelPipeline.sendDownstream(DefaultChannelPipeline.java:582) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:704) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.Channels.write(Channels.java:671) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.AbstractChannel.write(AbstractChannel.java:248) ~[storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.flushRequest(Client.java:398) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.messaging.netty.Client.send(Client.java:279) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.TransferDrainer.send(TransferDrainer.java:54) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086$fn__7087.invoke(worker.clj:351) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.daemon.worker$mk_transfer_tuples_handler$fn__7086.invoke(worker.clj:349) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$clojure_handler$reify__871.onEvent(disruptor.clj:58) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.disruptor$consume_loop_STAR_$fn__884.invoke(disruptor.clj:94) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at backtype.storm.util$async_loop$fn__550.invoke(util.clj:472) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at clojure.lang.AFn.run(AFn.java:22) [clojure-1.6.0.jar:na]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:26 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-supervisor4/10.0.0.104:6702... [0]
    2015-02-03 12:52:26 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-supervisor4/10.0.0.104:6702, [id: 0xa9bd1839, /10.0.0.102:37751 => supervisor4/10.0.0.104:6702]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [jackson]
    2015-02-03 12:52:26 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [mike]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [bertels]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [bertels]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [bertels]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [nathan]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [jackson]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [jackson]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [nathan]
    2015-02-03 12:52:27 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [bertels]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [mike]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [golda]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [mike]
    2015-02-03 12:52:28 b.s.m.n.StormClientErrorHandler [INFO] Connection failed Netty-Client-supervisor4/10.0.0.104:6702
    java.io.IOException: Connection reset by peer
    	at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.7.0_75]
    	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.7.0_75]
    	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.7.0_75]
    	at sun.nio.ch.IOUtil.read(IOUtil.java:192) ~[na:1.7.0_75]
    	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379) ~[na:1.7.0_75]
    	at org.apache.storm.netty.channel.socket.nio.NioWorker.read(NioWorker.java:64) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.process(AbstractNioWorker.java:108) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:89) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.channel.socket.nio.NioWorker.run(NioWorker.java:178) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at org.apache.storm.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [storm-core-0.10.0-SNAPSHOT.jar:0.10.0-SNAPSHOT]
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_75]
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_75]
    	at java.lang.Thread.run(Thread.java:745) [na:1.7.0_75]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [golda]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:10, stream: default, id: {}, [mike]
    2015-02-03 12:52:28 c.v.s.t.s.b.ForwarderBolt [INFO] Forwarding tuple source: wordSpout:9, stream: default, id: {}, [jackson]
    
    
    # From this point on the bolt would continuously report new output tuples
    # (cf. the "forwarding tuple" log messages above) until its downstream peer
    # would come fully back to live (read: 52mins after restarts).
    ```
    
    ### Current conclusion
    
    At the moment this patch seems not to be improving the situation.  (I wouldn't rule out that we screw up merging the patch into the current 0.10.0 master version but we didn't run into any merge conflicts so I'd say we applied the patch correctly.)  Silent data loss is as bad and arguably worse than a cascading failure.
    
    PS: We're about to share the storm-bolt-of-death topology, which might help with reproducing this issue in a deterministic way for the various people involved in this thread.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61287622
  
    storm-510 is also taken care of. since we don't block the connection to other workers.
    
    Now, Client.send(msgs) will never block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73847544
  
    I think if the channel is not `WRITABLE` when we are trying to flush a full message batch (or more than one, given that it's part of the `while` loop), then we'll drop those messages until the channel becomes `WRITABLE` again.  If acking is enabled, replaying may kick in if the spout/source supports it;  in all other cases the messages will be lost forever.
    
    ```scala
            // Collect messages into batches (to optimize network throughput), then flush them.
            while (msgs.hasNext()) {
                TaskMessage message = msgs.next();
                if (messageBatch == null) {
                    messageBatch = new MessageBatch(messageBatchSize);
                }
    
                messageBatch.add(message);
                // TODO: What shall we do if the channel is not writable?
                if (messageBatch.isFull()) {
                    MessageBatch toBeFlushed = messageBatch;
                    flushMessages(channel, toBeFlushed);
                    messageBatch = null;
                }
            }
    ```
    The code after the `while` loop will keep any remaining messages -- up to a full batch size -- in memory via the `messageBatch` field in case the channel is not `WRITABLE`:
    
    ```scala
            // Handle any remaining messages in case the "last" batch was not full.
            if (containsMessages(messageBatch)) {
                if (connectionEstablished(channel) && channel.isWritable()) {
                  // ...snip...
                else { // <<< if not WRITABLE, then we keep the remaining messages in memory and try background flushing
                    resumeBackgroundFlushing();
                    nextBackgroundFlushTimeMs.set(nowMillis() + flushCheckIntervalMs);
                }
            }
    ```
    
    To illustrate the current behavior (note: this behavior is the same before and after the changes in this pull request -- the PR does not modify this part of the logic) take the following example.
    
    * Message batch size: `10`
    * `send()` receives a list of `35` messages.
    * The channel becomes unwritable at the time we are processing message 13.
    
    In this case the code would try to process and split/batch the messages as follows:
    
    ```
    10 --> 10 --> 10 --> 5
    \             /      |
     \           /       |
      handled in      handled in if-else after while
      while loop
    ```
    
    Here, the first batch of `10` messages would be successfully written to the channel because the latter is still `WRITABLE`.  The channel would (given our assumption) then become unwritable while we're batching up messages 11-20, and subsequently we would not be able to write messages 11-20 and messages 21-30 to the channel.  Messages 31-35 would be kept in memory (via the `messageBatch` field), and we'd begin background flushing to write those 5 messages.
    
    So in this scenario:
    - Messages 1-10 would be written successfully.
    - Messages 11-20 and 21-30 would be dropped.  Acking configuration and spout/source behavior would determine whether this dropping would cause message loss or replaying.
    - Messages 31-35 would be buffered in-memory (i.e. not Netty's internal write buffer), and background flushing may or may not be able to successfully write the messages eventually.
    
    Is that a correct summary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73909944
  
    @miguno,
    
    I am sure you are aware that you can still send data to this channel even if channel.isWritable return false.
    
    check http://netty.io/4.0/api/io/netty/channel/Channel.html#isWritable()
    
    boolean isWritable()
    
    Returns true if and only if the I/O thread will perform the requested write operation **immediately**. Any write requests made when this method returns false are **queued** until the I/O thread is ready to process the queued write requests.
    
    doing or not doing isWritable check have nothing to do with message loss, the isWritable check is purely used to optimize the performance for small messages.
    
    case1: If we have a large enough batch, then we will just flush to netty internal queue. Netty will flush pending data in queueto wire when the wire is not that busy.
    
    case2: If we don't have a large enough batch, and the wire is busy, then the Storm netty Client will wait for a while, buffer more messages in the batch, and set a timer to flush later.
    
    case3: If we don't have a large enough batch, and the wire is free, then Storm Client will flush immediately, so that we have better latency.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61373371
  
    I traced back the bug of message loss.
    
    I found this issue is introduced by storm-350. (https://github.com/apache/storm/pull/134/files#diff-4cbcb6fa47274d6cf66d70585f98cbefR202)
    
    After upgrading from disruptor 2.10.1 to 3.2.1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61393817
  
    Hi Ted, 
    
    Can you try this on your live cluster and contribute some real case test result?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-60870953
  
    @tedxia 
    
    I got a chance to chat with Ted online. In summary, he is descrbing the following case (worker A -> worker B):
    1. B dies
    2. after zk session timeout, zk knows B is dead
    3. A is initiating the reconnection process to B. By default, it will retry 300 times at max.(it should be larger than 120second, based on the comments in config) “ ``` # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers  should also wait at least that long before giving up on connecting to the other worker.```”
    4. zk is under heavy load(consider a zk tree which have 100 thoudsands nodes, and many many watchers), may take minutes to notify A that B is dead.
    5. A didn't get notification from zk in time after 300 connection retries, reconnection failed, it throws, which will cause the worker to exit.
    
    Basically there are two questions asked. First, whether we can assure the zookeeper is responsive(< 1minute). Second, If worker doesn't get update of B from zookeeper after 300 reconnection retries, should we exit the worker or let worker continues to work?
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61392411
  
    When I reverted STORM-350, and test it again.
    There is no more message loss.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-62516293
  
    @clockfly 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61600325
  
    @nathanmarz @revans2 
    
    Do you want to take a look at it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-82196521
  
    I will ask Ted to close this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by d2r <gi...@git.apache.org>.
Github user d2r commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-81733977
  
    @miguno, @tedxia I am scanning through open pull requests: It seems #429 has been merged instead of this pull request and STORM-329 has been resolved.
    
    If this is correct, and we no longer need these changes, can we close this pull request?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r19065812
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -330,6 +330,20 @@
                   (.send drainer node+port->socket)))
               (.clear drainer))))))
     
    +;; Check whether this messaging connection is ready to send data
    +(defn is-connection-ready [^IConnection connection]
    +  (if (instance?  ConnectionWithStatus connection)
    --- End diff --
    
    Here we have a type check (instance?), which may not be clean.
    
    To change IConnection may be cleaner, but that may bring compatible issues(support the user already implemented some messaging with current IConnection declaration)
    
    If the messaging connection has extended ConnectionWithStatus, then we will check the status.
    If the messaging connection has NOT extended ConnectionWithStatus, then we behavior in the same as before. (We will skip this check)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66898172
  
    I'm still -1. I don't understand the changes to worker.clj. Workers should start processing and sending immediately regardless of whether they're connected to other workers yet. The connecting/reconnecting/buffering/dropping logic should be completely encapsulated within the netty classes (as it was with zeromq). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61411420
  
    @clockfly OK, actually I did setup whole test environment using 0.9.3-rc1, but I ran perf test topology too short so maybe I can't see any problem. I'll test again and let you know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-81736347
  
    @d2r And btw I don't have permissions on GitHub to close this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61410622
  
    Hi HeartSaVioR,
    
    For acked topology, there is at least once delivery gurantee.
    When a tuple is dropped, the tuple cached at spout side will timeout, and it will report a failure to storm UI, it will be shown as "failed" count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-58608389
  
    ok, I will add some specific unit tests later, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-64870378
  
    I test this on our cluster;
    ###### Before add this patch 
    ```
    2014-11-28 15:03:56 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-lg-hadoop-tst-st03.bj/10.2.201.68:49967... [48]
    2014-11-28 15:04:00 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:49967... [49]
    2014-11-28 15:04:04 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:49967... [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-A/xxx.xxx.xxx.xxx:49967
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-A/xxx.xxx.xxx.xxx:49967..., timeout: 600000ms, pendings: 0 
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages...
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages...
    2014-11-28 15:04:08 o.a.c.r.ExponentialBackoffRetry [WARN] maxRetries too large (50). Pinning to 29
    2014-11-28 15:04:08 b.s.u.StormBoundedExponentialBackoffRetry [INFO] The baseSleepTimeMs [100] the maxSleepTimeMs [4000] the ma
          xRetries [50]
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] New Netty Client, connect to B, 46389, config: , buffer_size: 524
          2880
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-B/xxx.xxx.xxx.xxx:46389... [0]    
    2014-11-28 15:04:08 b.s.m.n.Client [INFO] connection established to a remote host  Netty-Client-B/xxx.xxx.xxx.xxx:46389, [id: 0x0aa5eefe, /xxx.xxx.xxx.xxx:59716 =>  Netty-Client-B/xxx.xxx.xxx.xxx:46389]
    
    ```
    the log describe of the sequence of events:
    1. Worker send message to A but A had been died;
    2. Worker start connect to worker A until exceed max retry times;
    3. Meanwhile this worker send message to worker A, but send and connect are synchronized, send will wait until reconnect finish;
    4. Meanwhile refresh-connections in worker.clj is running, but it won't call worker A.close() until send finish, because it require endpoint-socket-lock first:
    ```
         ->>   (write-locked (:endpoint-socket-lock worker)
                    (reset! (:cached-task->node+port worker)
                            (HashMap. my-assignment)))
                  (doseq [endpoint remove-connections]
                    (.close (get @(:cached-node+port->socket worker) endpoint)))
    ```
    but right now send hold the endpoint-socket-lock:
    ```
      (disruptor/clojure-handler
          (fn [packets _ batch-end?]
            (.add drainer packets)
            
            (when batch-end?
       ->>  (read-locked endpoint-socket-lock
                (let [node+port->socket @node+port->socket]
                  (.send drainer node+port->socket)))
              (.clear drainer))))))
    ```
    5. After reconnect failed, it call close() and change Client status to Closed; 
    6. Send called, but Client status is Closed, so we drop the send message.
    7. After send finished,  refresh-connections was called, it first close worker A(closed before, so no log print), and connect to new worker B;
    
    
    ###### After add this patch
    ```
    2014-11-28 14:22:33 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:45909... [0]
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 4 messages...
    2014-11-28 14:22:33 b.s.m.n.Client [ERROR] The Connection channel currently is not available, dropping pending 10 messages...
    ```
    While reconnect to worker A, send message to worker A failed:
    ```
          // msgs iterator is invalid after this call, we cannot use it further
          int msgCount = iteratorSize(msgs);
    
          // the connection is down, drop pending messages
          LOG.error("The Connection channel currently is not available, dropping pending " + msgCount + " messages...");
    ``` 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61393103
  
    High Availability test
    ===============
    
    test scenario: 4 machine A,B,C,D, 4 worker, 1 worker on each machine
    
    test case1(STORM-404): on machine A, kill worker. A will create a new worker taking the same port.
    ------------
    expected result: reconnection will succeed.
    
    experiment result:
    other worker will start to reconnect, eventually it succeed. Because A starts a new worker with same port.
    ```
    2014-11-02T09:31:24.988+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [84]
    2014-11-02T09:31:25.498+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [85]
    2014-11-02T09:31:25.498+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-04/192.168.1.54:6703, [id: 0x54466bab, /192.168.1.51:51336 => IDHV22-04/192.168.1.54:6703]
    ```
    
    test case2(STORM-404): on machine A, kill worker, then immediately start a process to occupy the port used by the worker, which will force storm to relocate the worker to a new port(or a new machine.)
    --------------
    expected result: reconnection process will fail, becasue storm relocate the worker to a new port.
    
    Actual result:
    First after many reconnecton try, the reconnection is aborted, no exception thrown
    ```
    2014-11-02T09:31:14.753+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [63]
    2014-11-02T09:31:18.065+0800 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-IDHV22-04/192.168.1.54:6703... [70]
            at org.apache.storm.netty.handler.codec.oneone.OneToOneEncoder.doEncode(OneToOneEncoder.java:71) ~[storm-core-0.9.3-rc2-SNAPSHOT.jar:0.9.3-rc2-SNAPSHOT]
            ...
    2014-11-02T09:45:36.209+0800 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-IDHV22-04/192.168.1.54:6703..., timeout: 600000ms, pendings: 0
    2014-11-02T09:45:36.209+0800 b.s.m.n.Client [INFO] connection is closing, abort reconnecting...
    ```
    
    Second, a new connection to new worker(with new port, or on another machine)
    
    (previous the worker is at IDHV22-04:6703, then relocate to IDHV22-03:6702)
    ```
    2014-11-02T09:45:36.206+0800 b.s.m.n.Client [INFO] New Netty Client, connect to IDHV22-03, 6702, config: , buffer_size: 5242880
    2014-11-02T09:45:36.207+0800 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-IDHV22-03/192.168.1.53:6702, [id: 0x538fdacb, /192.168.1.51:56047 => IDHV22-03/192.168.1.53:6702]
    ```
    
    test case3: check the failed message count before and after the worker crash
    ----------------
    expect result: after the worker crash, there will some message loss. After it stablize, the message loss will not increase.
    
    Actual result: meet expectation.
    
    
    test case4: check the throughput change before and after the worker crash
    --------------
    expect result: There should be no performance drop.
    
    Actual result: 
    
    When storm start a new worker on same machine, there is no performance drop.
    Check the first gap in the following image.
    
    ![network bandwidth change before and after worker crash](https://issues.apache.org/jira/secure/attachment/12678758/worker-kill-recover3.jpg)
    
    When storm start a new worker on different machine. It may impact the parallism. Check the second gap in above picture. Before worker crash, there are 4 worker on 4 machine. After worker crash, there are 3 worker on 4 machine. The parallism drops, so the throughput drops. 
    
    test case5(STORM-510): when a target worker crash, the message sending to other workers should not be blocked.
    
    expect result: One connection should not block another in the case of worker crash.
    
    Actual result: 
    In the code, the blocking logic is removed. So, one connection will not block another connection.
    However, in the transition period of failure, because there will be many message loss to the crashed worker, the max.spout.pending flow control may kicks in, the spout message sending speed will be slower. And overall the max throughput will be smaller.
    
    After the transition, it goes back to normal. In my test, the transition peroid is around 40second.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61071050
  
    The UT pass.
    
    Tests run: 174, Assertions: 101503, Failures: 0, Errors: 0
    
    [INFO]
    [INFO] Storm ............................................. SUCCESS [1.313s]
    [INFO] maven-shade-clojure-transformer ................... SUCCESS [1.110s]
    [INFO] Storm Core ........................................ SUCCESS [2:08.715s]
    [INFO] storm-starter ..................................... SUCCESS [4.184s]
    [INFO] storm-kafka ....................................... SUCCESS [43.727s]
    [INFO] storm-hdfs ........................................ SUCCESS [0.916s]
    [INFO] storm-hbase ....................................... SUCCESS [0.943s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-60871537
  
    Ted raised a good point that, ```storm.messaging.netty.max_retries``` should also be larger than worker session timeout.
    
    Current, storm.messaging.netty.max_retries is larger than ```nimbus.task.launch.secs``` and ```supervisor.worker.start.timeout.secs```, it should also be larger than ```storm.zookeeper.session.timeout``` (which will invalidate worker session after this timeout)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66420678
  
    @clockfly threadpool.schedule() is't a blocking operation, as 
    ```
     In particular, because it acts as a fixed-sized pool using corePoolSize threads and an unbounded queue, adjustments to maximumPoolSize have no useful effect.
    ```
    For more information, please refer [ScheduledThreadPoolExecutor](https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73920741
  
    > So, I don't think your TODO comment is a issue, it is actually designed like this, how do you think?
    
    That makes sense to me.  I'll remove the TODO.
    
    > Go ahead and submit a new PR regarding storm-329, make sure give Ted his credits:)
    
    Will do.
    
    Credits for Ted:  But of course! Note that attribution (here: to Ted) is given when merging the pull request, i.e. when the CHANGELOG is updated.  I will highlight this in the pull request though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59772115
  
    Discussing 3 approaches seems interesting!
    
    I think it's key point about selecting approach.
    Does "replaying tuples with tuple timeout" make us feel safe?
    
    If we feel comfortable and safe with it, choosing approach A is no problem.
    If it doesn't, we need to choose approach B.
    In addition to aware of reassigning task, we need to choose approach C.
    
    Please correct me if I'm wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by MaximeDiab <gi...@git.apache.org>.
Github user MaximeDiab commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r24071792
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -142,6 +147,15 @@ public void run() {
                     
                 }
             };
    +
    +        connector = new Runnable() {
    +            @Override
    +            public void run() {
    +                if (!closing) {
    +                    connect();
    +                }
    +            }
    +        };
             
             long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
    --- End diff --
    
    we should use Math.max(...)?
    No, I do not think so. because .max returns the argument with the greatest value.
    So Math.min (30L * 1000 * max_sleep_ms max_retries) will return 30 seconds if "max_sleep_ms * max_retries" is greater than 30 s! and vice versa


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by fmazoyer <gi...@git.apache.org>.
Github user fmazoyer commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-71063105
  
    Hello,
    we recently stumbled upon the issue STORM-404 in storm 0.9.3.
    I was just curious if some more work was planned to fix that guy?
    Or just, could the work already done for the issue 404 be summed up in some way?
    
    Thanks a lot for your help :-)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-71137506
  
    STORM-404 is included by this issue,  and I have use this branch on our product cluster for more than four weeks, you can merge this branch and test again. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-68334058
  
    @nathanmarz will you have a look at this, thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73671537
  
    PS: I just noticed that I put the wrong Storm ticket into the commit https://github.com/miguno/storm/commit/8ebaaf8dbc63df3c2691e0cc3ac5102af7721ec3.  The `STORM-327` prefix of the commit message should have been `STORM-329`.
    
    I will keep the current code/commits as is in order to not disrupt the ongoing discussion in this thread.  Once we're happy with the code I will create a separate pull request, using the correct `STORM-329` ticket reference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r18687520
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/IConnection.java ---
    @@ -40,7 +41,7 @@
          * @param msgs
          */
     
    -    public void send(Iterator<TaskMessage> msgs);
    +    public void send(ArrayList<TaskMessage> msgs);
    --- End diff --
    
    Why the change? And if it does need to be changed, it shouldn't be specific to ArrayList but should be the least restrictive interface possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66620738
  
    @tedxia 
    
    Thanks, I think this will work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66620748
  
    +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59676638
  
    The updated patch still requires more function test and performance regression test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66948950
  
    @ptgoetz ,I have merged master branch, and all test success.
    @clockfly , Will you explain the change to worker.clj, thank you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r24039811
  
    --- Diff: storm-core/src/jvm/backtype/storm/messaging/netty/Client.java ---
    @@ -142,6 +147,15 @@ public void run() {
                     
                 }
             };
    +
    +        connector = new Runnable() {
    +            @Override
    +            public void run() {
    +                if (!closing) {
    +                    connect();
    +                }
    +            }
    +        };
             
             long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
    --- End diff --
    
    Either the comment or the code are incorrect.
    
    If we want to wait a maximum of 30s, then we should use `Math.max(...)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61752674
  
    I test this patch on our product cluster, with five machine, each with 6 workers as max;
    
    The topology based on trident run about 5 hours without fails.
    
    
    Then I kill one worker called A, then I found the log below on worker B.Worker B don't exit as worker A died. 
    ```
    2014-11-04 17:18:08 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:21812... [47]
    2014-11-04 17:18:12 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:21812... [48]
    2014-11-04 17:18:16 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:21812... [49]
    2014-11-04 17:18:20 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:21812... [50]
    2014-11-04 17:18:24 b.s.m.n.Client [INFO] Closing Netty Client Netty-Client-A/xxx.xxx.xxx.xxx:21812
    2014-11-04 17:18:24 b.s.m.n.Client [INFO] Waiting for pending batchs to be sent with Netty-Client-A/xxx.xxx.xxx.xxx:21812..., timeout: 600000ms, pendings: 0
    2014-11-04 17:18:24 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages...
    2014-11-04 17:18:24 b.s.m.n.Client [INFO] Client is being closed, and does not take requests any more, drop the messages...
    ```
    
    As worker A died, nimbus reschedule a new worker F, then worker B connect to worker F.
    ```
    2014-11-04 17:16:53 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/xxx.xxx.xxx.xxx:21812... [21]
    2014-11-04 17:16:54 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-F/xxx.xxx.xxx.xxx:21813... [17]
    2014-11-04 17:16:54 b.s.m.n.Client [INFO] connection established to a remote host Netty-Client-F/xxx.xxx.xxx.xxx:21813, [id: 0xbf721a18, /xxx.xxx.xxx.xxx:63811 => F/xxx.xxx.xxx.xxx:21813]
    2014-11-04 17:16:55 b.s.m.n.Client [INFO] Reconnect started for Netty-Client-A/10.2.201.65:21812... [22]
    ```
    worker B connect to worker F successful before worker B close connection with Worker A.
    
    Because this is our product cluster, I rewrite the hostname and ip in the log.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r19088253
  
    --- Diff: storm-core/src/clj/backtype/storm/daemon/worker.clj ---
    @@ -330,6 +330,20 @@
                   (.send drainer node+port->socket)))
               (.clear drainer))))))
     
    +;; Check whether this messaging connection is ready to send data
    +(defn is-connection-ready [^IConnection connection]
    +  (if (instance?  ConnectionWithStatus connection)
    --- End diff --
    
    Yes, if we change IConnection to add method status(), subclasses of IConnection should implement it. It means all of subclasses needs change.
    We can discuss it more with contributors / committers to decide we can break compatibility to make it more clearer, or keep compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-59487520
  
    Support we are sending data from worker A to worker B, to solve STORM-404(Worker on one machine crashes due to a failure of another worker on another machine), 
    
    I think we can adopt the following logics:
    
    case1: when B is down:
    1. B is lost, but A is still belive B is alive. 
    2. A try to send data to B, and then it triggers reconnect
    3. The Nimbus find B is lost, and notify A.
    4. A got notification that B is down, it will need to interrupt the reconnection of step 2( by closing the connection)
    5. The reconnection of step 2 is interuppted, it exit. it will not throw RuntimeException. 
    
    The key change is at step 4. A need to interrupt the reconnection to an obsolete worker. 
    
    case 2: when B is alive, but the connection from A to B is down
    1. A trigger reconnection logic
    2. reconnection timeout
    3. A cannot handle this failure, A throws RuntimeException.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by miguno <gi...@git.apache.org>.
Github user miguno commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-82217610
  
    @clockfly We may also want to check why we don't have the permissions to close this PR ourselves.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by clockfly <gi...@git.apache.org>.
Github user clockfly commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-73910685
  
    @miguno,
    
    Go ahead and submit a new PR regarding storm-329, make sure give Ted his credits:)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by nathanmarz <gi...@git.apache.org>.
Github user nathanmarz commented on a diff in the pull request:

    https://github.com/apache/storm/pull/268#discussion_r18687511
  
    --- Diff: storm-core/src/clj/backtype/storm/messaging/local.clj ---
    @@ -45,10 +46,10 @@
         (let [send-queue (add-queue! queues-map lock storm-id port)]
           (.put send-queue (TaskMessage. taskId payload))
           ))
    -  (^void send [this ^Iterator iter]
    +  (^void send [this ^ArrayList msgs]
         (let [send-queue (add-queue! queues-map lock storm-id port)]
    -      (while (.hasNext iter) 
    -         (.put send-queue (.next iter)))
    +      (fast-list-iter [task msgs]
    +        (.put send-queue (TaskMessage. (.task task) (.message task))))
    --- End diff --
    
    This needs type hints to avoid reflection.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-61803120
  
    ```
    High Availability test
    
    test scenario: 4 machine A,B,C,D, 4 worker, 1 worker on each machine
    
    test case1(STORM-404): on machine A, kill worker. A will create a new worker taking the same port.
    ```
    @clockfly ,in your case, I have some question about your case, storm scheduler will escape schedule a new worker at the same ip:port after a worker crash. 
    
    And if storm schedule not schedule new worker as i said, in you test case2, the scheduler will schedule new worker on the same ip:port continuously, the behavior will not change as you occupy the port.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by ptgoetz <gi...@git.apache.org>.
Github user ptgoetz commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-63127132
  
    I tested this by applying it to the 0.9.3 branch and found problems with the unit tests (never-ending cycle of zookeeper reconnects, tests never complete).
    
    @tedxia @clockfly Could you guys take a look and perhaps up merge against the 0.9.3-branch branch? I might have made a mistake during conflict resolution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia closed the pull request at:

    https://github.com/apache/storm/pull/268


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] storm pull request: STORM-329 : buffer message in client and recon...

Posted by tedxia <gi...@git.apache.org>.
Github user tedxia commented on the pull request:

    https://github.com/apache/storm/pull/268#issuecomment-66422581
  
    I tested this on our cluster, and the result are below:
    
    Before remove sleep in connect
    ![storm](https://cloud.githubusercontent.com/assets/8066671/5373174/d4364ffc-808d-11e4-8641-8647ca4d3581.png)
    
    After remove sleep in connect
    ![stormx1](https://cloud.githubusercontent.com/assets/8066671/5373187/ed25ac7e-808d-11e4-99ce-b63e0d5ea399.png)
    
    After remove sleep in connect, we got more than 100% speed increase, the most important thing is that failed Client will never bind send message to other worker anymore;
    
    @clockfly  will you test this on your cluster and give some advise.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---