You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Neha Narkhede (Created) (JIRA)" <ji...@apache.org> on 2012/03/26 23:28:30 UTC

[jira] [Created] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
--------------------------------------------------------------------------

                 Key: KAFKA-320
                 URL: https://issues.apache.org/jira/browse/KAFKA-320
             Project: Kafka
          Issue Type: Bug
          Components: core
    Affects Versions: 0.7, 0.8
            Reporter: Neha Narkhede
            Assignee: Neha Narkhede
            Priority: Critical


The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -

[error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
        at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
        at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
        at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
        at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
        at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at junit.framework.TestCase.runTest(TestCase.java:164)
        at junit.framework.TestCase.runBare(TestCase.java:130)
        at junit.framework.TestResult$1.protect(TestResult.java:110)
        at junit.framework.TestResult.runProtected(TestResult.java:128)
        at junit.framework.TestResult.run(TestResult.java:113)
        at junit.framework.TestCase.run(TestCase.java:120)
        at junit.framework.TestSuite.runTest(TestSuite.java:228)
        at junit.framework.TestSuite.run(TestSuite.java:223)
        at junit.framework.TestSuite.runTest(TestSuite.java:228)
        at junit.framework.TestSuite.run(TestSuite.java:223)
        at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
        at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
        at sbt.TestRunner.run(TestFramework.scala:53)
        at sbt.TestRunner.runTest$1(TestFramework.scala:67)
        at sbt.TestRunner.run(TestFramework.scala:76)
        at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
        at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
        at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
        at sbt.NamedTestTask.run(TestFramework.scala:92)
        at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
        at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
        at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
        at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
        at sbt.impl.RunTask.runTask(RunTask.scala:85)
        at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
        at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
        at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
        at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
        at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
        at sbt.Control$.trapUnit(Control.scala:19)
        at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)

The test basically restarts a server and fails with this exception during the restart

This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -

In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 

If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.

The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 

[2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
[2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
        at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13249129#comment-13249129 ] 

Jun Rao commented on KAFKA-320:
-------------------------------

Ok, we can take v3 for now and track the broker startup/shutdown in kafka-328.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Issue Comment Edited] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Issue Comment Edited) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13241396#comment-13241396 ] 

Jun Rao edited comment on KAFKA-320 at 3/29/12 5:21 PM:
--------------------------------------------------------

That's a good finding. We should probably patch it in both trunk and 0.8. Just one comment.

We should only allow a KafkaServer to startup if it has been shutdown.
                
      was (Author: junrao):
    That's a good finding. We should probably patch it in both trunk and 0.8. Just one comment.

We should only allow a KafkaServer to startup unless it has been shutdown.
                  
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13244395#comment-13244395 ] 

Jun Rao commented on KAFKA-320:
-------------------------------

The ZookeeperTestHarness change looks nice. a couple more comments:

4. KafkaServer: It does look  a bit more complex now and some of the testing is not done atomically. How about the following? 
4.1 add an AtomticBoolean isServerStartable and initialize to true;
4.2 in startup(), if we can atomically set isServerStartable from true to false, proceed with startup; otherwise throw an exception.
4.3 in shutdown(), if isServerStartable is false, proceed with shutdown, at the very end, set isServerStartable to true. 
Startup() and shutdown() are expected to be called from the same thread. So we can expect a shutdown won't be called until a startup completes.

5. SyncProducerTest: unused imports

                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13241383#comment-13241383 ] 

Neha Narkhede commented on KAFKA-320:
-------------------------------------

Can someone review this ?
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Resolution: Fixed
        Status: Resolved  (was: Patch Available)

Checked into trunk and 0.8 branch
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13244425#comment-13244425 ] 

Neha Narkhede commented on KAFKA-320:
-------------------------------------

Thanks for the review!

4. Regarding this, what do people think about the conditions under which a Kafka server should be allowed to startup and shutdown (listed under 2.1 and 2.2 above) ?
5. Will fix this before checkin.
6. Also, looks like improving the kafka server startup and shutdown is orthogonal to this bug fix. Can this be fixed (cleanly) through another JIRA ? I'd like to include just the fix for this issue as part of the checkin. 
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Attachment: kafka-320.patch

This patch includes the following -

1. Fixes the kafka server restart bug by resetting the shutdown state variables in the startup() API of the KafkaServer. 

2. Shuts down the zkclient in the Producer

3. ZkClient and Zookeeper can be set to WARN since we fixed the real issue, causing several warnings during the unit tests.

                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13241396#comment-13241396 ] 

Jun Rao commented on KAFKA-320:
-------------------------------

That's a good finding. We should probably patch it in both trunk and 0.8. Just one comment.

We should only allow a KafkaServer to startup unless it has been shutdown.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Prashanth Menon (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13241409#comment-13241409 ] 

Prashanth Menon commented on KAFKA-320:
---------------------------------------

Great catch!  +1, looks good and works on my machine.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13244965#comment-13244965 ] 

Jun Rao commented on KAFKA-320:
-------------------------------

Yes, we can use another jira to see how we can improve kafka server startup and shutdown. For this jira, we can just make minimal changes in kafka server.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Attachment: kafka-320-v2.patch

OK, I made some more changes -

1. Cleaned up zkClient instance creations in unit tests. Now it is wrapped up inside ZookeeperTestHarness, so we ensure that it gets cleanup at an appropriate time. 

2. Changed Kafka server startup and shutdown behavior. Possibly made it more complex. Basically, 

2.1 A Kafka server can startup if it is not already starting up, if it is not currently being shutdown, or if it hasn't been already started

2.2 A Kafka server can shutdown if it is not already shutting down, if it is not currently starting up, or if it hasn't been already shutdown. 
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Status: Patch Available  (was: Open)
    
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Jun Rao (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jun Rao updated KAFKA-320:
--------------------------

    Attachment: kafka-320-v3-delta.patch

Overall, patch v3 looks good. I made a minor tweak of KafkaServer on top of v3. How does that look? You will need to:
1. apply patch v3
2. svn revert core/src/main/scala/kafka/server/KafkaServer.scala
3. apply patch kafka-320-v3-delta.patch
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Commented] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Commented) (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13249098#comment-13249098 ] 

Neha Narkhede commented on KAFKA-320:
-------------------------------------

After applying the v3-delta patch, I see that some zkclient connections are not closed cleanly - 
[2012-04-06 17:34:12,805] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper
.server.NIOServerCnxn:639)
[2012-04-06 17:34:12,809] WARN Exception causing close of session 0x0 due to java.io.IOException: ZooKeeperServer not running (org.apache.zookeeper
.server.NIOServerCnxn:639)
[2012-04-06 17:34:13,201] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38c37a0005, likely client has clos
ed socket (org.apache.zookeeper.server.NIOServerCnxn:634)
[2012-04-06 17:34:13,264] WARN EndOfStreamException: Unable to read additional data from client sessionid 0x1368a38a86a0080, likely client has clos
ed socket (org.apache.zookeeper.server.NIOServerCnxn:634)

Also, after you set the isShutdown variable and before you check canStart, there can be interleaving between startup and shutdown, that can lead to open zookeeper client connection.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3-delta.patch, kafka-320-v3.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

[jira] [Updated] (KAFKA-320) testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException

Posted by "Neha Narkhede (Updated) (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/KAFKA-320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Neha Narkhede updated KAFKA-320:
--------------------------------

    Attachment: kafka-320-v3.patch

Filed KAFKA-328 for improving startup and shutdown API of Kafka server.

Kept everything in v2 minus the complexity of changes in Kafka server.
                
> testZKSendWithDeadBroker fails intermittently due to ZKNodeExistsException
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-320
>                 URL: https://issues.apache.org/jira/browse/KAFKA-320
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.7, 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Critical
>         Attachments: kafka-320-v2.patch, kafka-320-v3.patch, kafka-320.patch
>
>
> The testZKSendWithDeadBroker inside ProducerTest fails intermittently with the following exception -
> [error] Test Failed: testZKSendWithDeadBroker(kafka.producer.ProducerTest)
> java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0. This probably indicates that you either have configured a brokerid that is already in use, or else you have shutdown this broker and restarted it faster than the zookeeper timeout so it appears to be re-registering.
>         at kafka.utils.ZkUtils$.registerBrokerInZk(ZkUtils.scala:109)
>         at kafka.server.KafkaZooKeeper.kafka$server$KafkaZooKeeper$$registerBrokerInZk(KafkaZooKeeper.scala:60)
>         at kafka.server.KafkaZooKeeper.startup(KafkaZooKeeper.scala:52)
>         at kafka.server.KafkaServer.startup(KafkaServer.scala:84)
>         at kafka.producer.ProducerTest.testZKSendWithDeadBroker(ProducerTest.scala:174)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>         at java.lang.reflect.Method.invoke(Method.java:597)
>         at junit.framework.TestCase.runTest(TestCase.java:164)
>         at junit.framework.TestCase.runBare(TestCase.java:130)
>         at junit.framework.TestResult$1.protect(TestResult.java:110)
>         at junit.framework.TestResult.runProtected(TestResult.java:128)
>         at junit.framework.TestResult.run(TestResult.java:113)
>         at junit.framework.TestCase.run(TestCase.java:120)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at junit.framework.TestSuite.runTest(TestSuite.java:228)
>         at junit.framework.TestSuite.run(TestSuite.java:223)
>         at org.scalatest.junit.JUnit3Suite.run(JUnit3Suite.scala:309)
>         at org.scalatest.tools.ScalaTestFramework$ScalaTestRunner.run(ScalaTestFramework.scala:40)
>         at sbt.TestRunner.run(TestFramework.scala:53)
>         at sbt.TestRunner.runTest$1(TestFramework.scala:67)
>         at sbt.TestRunner.run(TestFramework.scala:76)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11.runTest$2(TestFramework.scala:194)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.TestFramework$$anonfun$10$$anonfun$apply$11$$anonfun$apply$12.apply(TestFramework.scala:205)
>         at sbt.NamedTestTask.run(TestFramework.scala:92)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.ScalaProject$$anonfun$sbt$ScalaProject$$toTask$1.apply(ScalaProject.scala:193)
>         at sbt.TaskManager$Task.invoke(TaskManager.scala:62)
>         at sbt.impl.RunTask.doRun$1(RunTask.scala:77)
>         at sbt.impl.RunTask.runTask(RunTask.scala:85)
>         at sbt.impl.RunTask.sbt$impl$RunTask$$runIfNotRoot(RunTask.scala:60)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.impl.RunTask$$anonfun$runTasksExceptRoot$2.apply(RunTask.scala:48)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Distributor$Run$Worker$$anonfun$2.apply(ParallelRunner.scala:131)
>         at sbt.Control$.trapUnit(Control.scala:19)
>         at sbt.Distributor$Run$Worker.run(ParallelRunner.scala:131)
> The test basically restarts a server and fails with this exception during the restart
> This is unexpected, since server1, after shutting down, should trigger the deletion of its registration of the broker id from ZK. But, here is the Kafka bug causing this problem -
> In the test during server1.shutdown(), we do close the zkClient associated with the broker and it successfully deletes the broker's registration info from Zookeeper. After this, server1 can be succesfully started. Then the test completes and in the teardown(), we call server1.shutdown(). During this, the server doesn't really shutdown, since it is protected with the isShuttingDown variable, which was never set to false in the startup() API. Now, this leads to an open zkclient connection for the current test run. 
> If you try to re-run ProducerTest without exiting sbt, it will first bring up the zookeeper server. Then, since the kafka server during the previous run is still running, it can succesfully renew its session with zookeeper, and retain the /brokers/ids/0 ephemeral node. If it does this before server1.startup() is called in the test, the test will fail.
> The fix is to set the shutdown related variables correctly in the startup API of KafkaServer. Also, during debugging this, I found that we don't close zkclient in the Producer as well. Due to this, unit tests throw a whole bunch of WARN that look like - 
> [2012-03-26 14:14:27,703] INFO Opening socket connection to server nnarkhed-ld /127.0.0.1:2182 (org.apache.zookeeper.ClientCnxn:1061)
> [2012-03-26 14:14:27,703] WARN Session 0x13650dbf8dd0005 for server null, unexpected error, closing socket connection and attempting reconnect (org.apache.zookeeper.ClientCnxn:1188)
> java.net.ConnectException: Connection refused
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567)
>         at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1146)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira