You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Prashanth Menon (Created) (JIRA)" <ji...@apache.org> on 2012/03/15 14:11:39 UTC

[jira] [Created] (KAFKA-305) SyncProducer does not correctly timeout

SyncProducer does not correctly timeout
---------------------------------------

                 Key: KAFKA-305
                 URL: https://issues.apache.org/jira/browse/KAFKA-305
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.7, 0.8
            Reporter: Prashanth Menon
            Priority: Critical


So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13237021#comment-13237021 ] 

Jun Rao commented on KAFKA-305:
-------------------------------

If this is indeed a ZK issue, we can probably check/wait that the ephemeral node is gone before restarting the broker.
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13232415#comment-13232415 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

I, unfortunately, didn't get a chance to work on this over the weekend.  From my point of view, creating a new ReadableByteChannel that wraps the socket channel InputStream seems like the simplest solution.  Then the SyncProudcer will have a writeChannel (the SocketChannel) and a readChannel (the wrapped version).  All writes and reads go through the respective channels with the additional of timeout functionality.  

Another step we can take it is to move all that logic into some class BlockingChannel which can be reused on the consumer side in SimpleConsumer.  Such a class would have, perhaps, four methods: connect, disconnect, send and receive.  Connect and disconnect would be synchronized, send would take a Request object and receive would return a Tuple2[Receive, Int] like usual.  Send and receive will need to be synchronized externally, meaning the class can be effectively treated like a regular Channel otherwise ...

Thoughts?
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13238045#comment-13238045 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

I've attached another non-blocking implementation that uses selectors, but I'm not seeing any significant performance boost on my machine.  I tested it on the producer side using the ProducerPerformance class by varying the number of messages, the message sizes and the number of threads.  Each test scenario was run four times and the average result was used.  Find the results here: https://gist.github.com/2202142.  

For what it's worth, I think we should go ahead with the simple solution attached in the v2 path - if everyone is okay with it, please commit.  Regarding the test error, it could potentially be a valid ZK or ZKClient bug.  I can investigate a little by digging into ZKClient and asking around the mailing list and channels.  Keeping the test in breaks the test unpredictably.  Thought I'm not entirely okay with it keeping the bug in, waiting for the node to go down doesn't seem to be the right solution either.  
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: BlockingChannel2.scala, KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13236782#comment-13236782 ] 

Jun Rao commented on KAFKA-305:
-------------------------------

Prashanth,

v2 patch looks good. 

As for 5, I do see transient failures of testZKSendWithDeadBroker. This is a bit weird. During broker shutdown, we close the ZK client, which should cause all ephemeral nodes to be deleted in ZK. Could you verify if this is indeed the behavior of ZK?

As for BrokerPartitionInfo and ProducerPool, we should clean up dead brokers. Could you open a separate jira to track that?
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Resolved] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Jun Rao (Resolved) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao resolved KAFKA-305.
---------------------------

       Resolution: Fixed
    Fix Version/s: 0.8
         Assignee: Prashanth Menon

Prashanth,

Thanks for the patch. I agree that v2 is less risky than the selector approach. So, we can revisit the selector approach later. Thanks for the patch though and it will probably be useful in the future. Committed v2 patch to 0.8 branch with the following minor changes in DefaultEventHandler:
* log all unsent messages
* maintain outstandingRequests properly on both successful and unsuccessful sends.

Could you file 2 jiras, one for taking out dead brokers in ProducerPool and another for transient failures due to ZK ephemeral node not deleted in time?
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Assignee: Prashanth Menon
>            Priority: Critical
>             Fix For: 0.8
>
>         Attachments: BlockingChannel2.scala, KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13233969#comment-13233969 ] 

Jun Rao commented on KAFKA-305:
-------------------------------

Prashanth,

Thanks for the patch. This is very useful. Some comments:

1. I think it makes sense for SimpleConsumer to use BlockingChannel as well. Could you change that in this patch too?
2. ProducerTest.testZKSendWithDeadBroker: This test doesn't really test the timeout on getting a response. We probably need to create a mock kafkaserver (that don't send a response) to test this out.
3. BlockingChannel: 
3.1 We probably should rename timeoutMs to readTimeoutMs since only reads are subject to the timeout.
3.2 We should pass in a socketSendBufferSize and a socketReceiveBufferSize.
3.3 Should host and port be part of the constructor? It seems to me it's cleaner if each instance of BlockingChannel is tied to 1 host and 1 port.

I'd also be interested in your findings on the comparison with NIO with selectors.

                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashanth Menon updated KAFKA-305:
----------------------------------

    Attachment: KAFKA-305-v2.patch
    
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13238873#comment-13238873 ] 

Neha Narkhede commented on KAFKA-305:
-------------------------------------

I found the zookeeper related problem, filed KAFKA-320 and also included a patch.
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Assignee: Prashanth Menon
>            Priority: Critical
>             Fix For: 0.8
>
>         Attachments: BlockingChannel2.scala, KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashanth Menon updated KAFKA-305:
----------------------------------

    Attachment: BlockingChannel2.scala
    
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: BlockingChannel2.scala, KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13236976#comment-13236976 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

Thanks for the input everyone.  Regarding the ZK failure, that is effectively the trace I'm seeing on my end as well - the log makes it clear that the ephemeral nodes get deleted but the test still fails when creating them afterwards.  

I would like to delay commiting this patch, atleast for the weekend, as I'd like to perform a little benchmark against a pure NIO implementation.  The benefits there would be having timeouts for both read and write operations and a potential performance boost.
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashanth Menon updated KAFKA-305:
----------------------------------

    Attachment: KAFKA-305-v1.patch
    
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13235614#comment-13235614 ] 

Jun Rao commented on KAFKA-305:
-------------------------------

Prashant,

2. If you want to make sure that a broker is shut down, you need to call kafkaServer.awaitShutdown after calling kafkaServer.shutdown. Overall, I don't quite understand how the new test works. It only brought down 1 broker and yet the comment says all brokers are down. If it is indeed that all brokers are down, any RPC call to the broker should get a broken pipe or socket closed exception immediately, not a sockettimeout exception. So, to really test that the timeout works, we need to keep the broker alive and somehow delay the response from the server. This can probably be done with a mock request handler.
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13236942#comment-13236942 ] 

Neha Narkhede commented on KAFKA-305:
-------------------------------------

v2 looks good. 

Regarding the test failure, I debugged it and see a probable bug with either Zookeeper or ZkClient. See below - 

[info] Test Starting: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
Shutting down broker 0
[2012-03-23 11:50:36,870] DEBUG Deleting ephemeral node /brokers/ids/0 for session 0x13640e55f240013 (org.apache.zookeeper.server.DataTree:831)
[2012-03-23 11:50:36,873] DEBUG Deleting ephemeral node /brokers/topics/new-topic/partitions/3/leader for session 0x13640e55f240013 (org.apache.zookeeper.server.DataTree:831)
[2012-03-23 11:50:36,873] DEBUG Deleting ephemeral node /brokers/topics/new-topic/partitions/1/leader for session 0x13640e55f240013 (org.apache.zookeeper.server.DataTree:831)
[2012-03-23 11:50:36,873] DEBUG Deleting ephemeral node /brokers/topics/new-topic/partitions/2/leader for session 0x13640e55f240013 (org.apache.zookeeper.server.DataTree:831)
[2012-03-23 11:50:36,873] DEBUG Deleting ephemeral node /brokers/topics/new-topic/partitions/0/leader for session 0x13640e55f240013 (org.apache.zookeeper.server.DataTree:831)
Shut down broker 0
Restarting broker 0
[2012-03-23 11:50:45,194] DEBUG Deleting ephemeral node /brokers/ids/1 for session 0x13640e55f24001b (org.apache.zookeeper.server.DataTree:831)
[error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
	at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
	at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
	at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
	at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
	at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:173)

Notice that after shutting down broker 0, the ephemeral node was deleted from its in memory data tree. That happens part of the close session workflow. Still, when we try to create the ephemeral node again, it complains that it already exists. 

I'll come back to this zookeeper bug later. I'd say lets checkin this test since it helps reproduce this zk bug. 

I think your patch looks good. 
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13239050#comment-13239050 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

Awesome!
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Assignee: Prashanth Menon
>            Priority: Critical
>             Fix For: 0.8
>
>         Attachments: BlockingChannel2.scala, KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashanth Menon updated KAFKA-305:
----------------------------------

    Attachment: KAFKA-305-v2.patch
    
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13235295#comment-13235295 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

Thanks for review, Jun.

1. Will do.
2. So that test actually exposed the issue to begin with - the initial send would fail and then hang forever when attempting to refresh the topic metadata.  Regardless, I'll create a separate more direct test for timeouts.  On my local machine, this test seems to be unpredictable around 30% of the time.  In these cases, it seems like the ephemeral broker nodes aren't removed from ZK and bringing back up a broker after shutdown throws a "Broker already exists" exception.  Is anyone else experiencing it or just me?  Increasing the wait time after shutdown helps but not 100%.
3. 1,2,3 Sounds fair.

I should be able to get a patch in for this by Friday.  Then continue on KAFKA-49 over the weekend and get it in on Saturday or Sunday should the review go okay.  Apologies for the delays :(
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13235643#comment-13235643 ] 

Neha Narkhede commented on KAFKA-305:
-------------------------------------

Prashanth,

Thanks for the patch. A couple of suggestions -

1. Since you are adding a new abstraction, BlockingChannel, would it make sense to change SimpleConsumer to use it ? Its your call if you'd rather fix it in another JIRA.
2. In BlockingChannel, since you are synchronizing on a lock, any reason the connected boolean be a volatile ? Also, you can avoid resetting the read and write channels to null values in disconnect.
3. Lets add some more tests for this, since it is unclear if the workaround of wrapping input stream in a channel actually works or not. I like Jun's suggestion of mocking out the request handler to achieve this. Tests would include SyncProducer as well as async producer (DefaultEventHandler)
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13233119#comment-13233119 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

Hi all, I've attached a patch.  Some notes:

- New class called BlockingChannel that has timeouts enabled.
- SyncProducer uses BlockingChannel instead of creating its own SocketChannel
- Re-introducsed testZKSendWithDeadBroker which passes now.

I'd like to get feedback on this.  It's simple and may be reused on the consumer side.  When I think about it, it would be nice to combine SimpleConsumer and SyncProducer into one generic "SimpleClient" since the functionality is effectively the same.

I'd also like to benchmark this against a pure NIO implementation where we can use selectors to enabled timeout functionality.  It'll be more complex and will require minor adjustment to BoundedByteBuffer and BoundedByteBufferSend but it may be worth it.
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Prashanth Menon updated KAFKA-305:
----------------------------------

    Attachment:     (was: KAFKA-305-v2.patch)
    
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-305) SyncProducer does not correctly timeout

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13236291#comment-13236291 ] 

Prashanth Menon commented on KAFKA-305:
---------------------------------------

I've uploaded a new patch with the suggestions, but it's not ready for commit, just another review.  A few notes:

1. BlockingChannel modified to meet suggestions.
2. SimpleConsumer uses BlockingChannel.
3. To test the BlockingChannel (in SyncProducer and async producer), I bring up a regular server but shutdown the requesthandler.  So the socket remains open, accepts requests and queues them in the request channel, but there are no handlers processing requests.
4. The original testZKSendWithDeadBroker wasn't commented entirely correctly.  I've modified to actually test what the name suggests.
5. Though I wait for the broker to do down, testZKSendWithDeadBroker still unpredictably throws the "Broker already registered" exception.  Are you experiencing this locally?

I think there might be an issue with the BrokerPartitionInfo and ProduerPool classes.  ProducerPool never removes producers even if one is connected to a downed broker, so calls to getAnyProducer (used by BrokerPartitioninfo.updateInfo to update cached topic metadata information) could return the same "bad" producer on consecutive calls when attempting to refresh the cache.  This could potentially cause an entire send to fail though there may exist a broker that is able to service the topic metadata request.  We need to somehow, remove "bad" producers, or refresh the ProducerPool when brokers go down, or have BrokerPartitionInfo retry its updateInfo call a certain number of times.  Thoughts?
                
> SyncProducer does not correctly timeout
> ---------------------------------------
>
>                 Key: KAFKA-305
>                 URL: https://issues.apache.org/jira/browse/KAFKA-305
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Prashanth Menon
>            Priority: Critical
>         Attachments: KAFKA-305-v1.patch, KAFKA-305-v2.patch
>
>
> So it turns out that using the channel in SyncProducer like we are to perform blocking reads will not trigger socket timeouts (though we set it) and will block forever which is bad.  This bug identifies the issue: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=4614802 and this article presents a potential work-around: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel for workaround. The work-around is a simple solution that involves creating a separate ReadableByteChannel instance for timeout-enabled reads.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira