You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Elliot Metsger <em...@gmail.com> on 2017/10/03 19:35:06 UTC

TimeoutException: Timed out waiting for a node assignment. when AdminClient attempts to create topics

Hi everyone,

New user to Kafka, and am psyched to get it working in our environment!
I'm on 0.11.0.0, and using spring-kafka 2.0.0.rc1.  I'm struggling to get
things working in our unit/integration tests.

I don't want to overload everyone with the context of the issues, but this
is the stacktrace I'm seeing (from the tail end of the Maven build):
https://gist.github.com/emetsger/05ef47ce15ca54b2735a65627992a2d0

The full build output is here: https://goo.gl/pwM8Lw

It appears that the AdminClient times out trying to create the topic for
the unit test:

13:54:40.964 [-client-thread | adminclient-1] DEBUG [
 KafkaAdminClient$Call] - Call(callName=createTopics,
deadlineMs=1507053280963) timed out at 1507053280964 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting for a node
assignment.
at
org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:475)
at
org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:591)
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutNewCalls(KafkaAdminClient.java:663)
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:908)
at java.lang.Thread.run(Thread.java:748)

The unit test uses the maven-buildhelper-plugin to reserve a port for the
Kakfa broker, in this case, port 55383.  If you look at the gist, I've
added the properties used by the AdminClient and the Broker, and it seems
that they are configured properly (that is, the AC is configured to talk to
127.0.0.1:55383, which is where the broker is listening).

Another relevant note is that I'm using spring-kafka, and Spring is
managing all the beans related to Kafka: the producer, consumer, and
broker.  The broker is being started using the @EmbeddedKafka annotation,
which will create an embedded instance of Zookeeper, and an EmbeddedKafka
instance:

@TestPropertySource(locations = { "classpath:/rmapcore.properties",
"classpath:/kafka-broker.properties" })
@EmbeddedKafka(topics = { "rmap-event-topic" }, brokerProperties = {
"log.dir=${kafka.broker.logs-dir}", "port=${kafka.broker.port}" })

Kafka should be automatically creating the topic (`rmap-event-topic` in
this case), and it looks like the AdminClient tries, but ultimately fails,
timing out trying to connect to the broker:

15:21:34.977 [                          main] DEBUG
[KafkaAdminClient$AdminClientRu] - adminclient-1: queueing
Call(callName=createTopics, deadlineMs=1507058614976) with a timeout 120000
ms from now.
...
15:21:35.047 [-client-thread | adminclient-1] TRACE [
 NetworkClient] - Found least loaded node 127.0.0.1:58230 (id: -1 rack:
null)
15:21:35.047 [-client-thread | adminclient-1] DEBUG
[NetworkClient$DefaultMetadataU] - Initialize connection to node -1 for
sending metadata request
15:21:35.047 [-client-thread | adminclient-1] DEBUG [
 NetworkClient] - Initiating connection to node -1 at 127.0.0.1:58230.
15:21:35.048 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-sent
15:21:35.048 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-received
15:21:35.049 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.latency
15:21:35.055 [-client-thread | adminclient-1] DEBUG [
Selector] - Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:927)
at java.lang.Thread.run(Thread.java:748)

Polling continues, and ultimately times out:

15:23:34.981 [-client-thread | adminclient-1] DEBUG [
 KafkaAdminClient$Call] - Call(callName=createTopics,
deadlineMs=1507058614976) timed out at 1507058614981 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting for a node
assignment.
at
org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:475)
at
org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:591)
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutNewCalls(KafkaAdminClient.java:663)
at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:908)
at java.lang.Thread.run(Thread.java:748)


Any help or advice is appreciated!

Thanks,
Elliot

Feel free to follow the steps to reproduce locally, or just look at the
full build output here https://goo.gl/pwM8Lw
1. Because I'm using Spring release candidates in the build, you'll need to
add two repositories to your ~/.m2/settings.xml:
    <repositories>
        <repository>
          <id>spring-libs-release</id>
          <name>Spring Releases</name>
          <url>https://repo.spring.io/libs-release</url>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>

        <repository>
          <id>spring-libs-snap</id>
          <name>Spring Snapshots</name>
          <url>https://repo.spring.io/libs-snapshot</url>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
    </repositories>

2. Check out https://github.com/emetsger/spring-kafka, branch
property-placeholder-resolution.  Run `./gradlew clean install` from inside
the spring-kafka directory.
- this is the same as spring-kafka 2.0.0.rc1, but allows for the use of
property placeholders in the `brokerProperties` attribute of @EmbeddedKafka
(PR pending)

3. Check out https://github.com/emetsger/rmap.git,
branch kakfa-adminclient-timeout in a new directory.

4.  Run mvn clean install -Dcheckstyle.skip -Djavadoc.skip -DskipTests from
the RMap directory.  The build will fail at the "Solr indexing"
(rmap-indexing-solr) module.  That's OK, ignore it. This is just to
populate your local Maven repository with the necessary artifacts (and
`rmap-indexing-solr` isn't necessary for reproducing this issue).

5. Now that all the dependencies are installed, cd into the `core`
directory and run an example unit test to replicate the problem: `mvn test
-Dlogback.configurationFile=src/test/resources/logback-test.xml
-Dtest=ORMapEventCreationTest`

Re: TimeoutException: Timed out waiting for a node assignment. when AdminClient attempts to create topics

Posted by Elliot Metsger <em...@gmail.com>.
I always do this: post a detailed explanation and then find some
discrepancy: the earlier message contains log snippits from another test
run and so the ports don't match up with my text.  Here's log snippets from
the posted files:


Queuing the call:
13:52:40.968 [                          main] DEBUG
[KafkaAdminClient$AdminClientRu] - adminclient-1: queueing
Call(callName=createTopics, deadlineMs=1507053280963) with a timeout 120000
ms from now.

Encountering connection refused:
13:52:41.024 [-client-thread | adminclient-1] TRACE [
 NetworkClient] - Found least loaded node 127.0.0.1:55383 (id: -1 rack:
null)
13:52:41.024 [-client-thread | adminclient-1] DEBUG
[NetworkClient$DefaultMetadataU] - Initialize connection to node -1 for
sending metadata request
13:52:41.025 [-client-thread | adminclient-1] DEBUG [
 NetworkClient] - Initiating connection to node -1 at 127.0.0.1:55383.
13:52:41.024 [nerName(PLAINTEXT)-PLAINTEXT-0] TRACE [
 Logging$class] - Socket server received response to send, registering for
write and sending data: Response(Request(0,127.0.0.1:55388-127.0.0.1:55390
,Session(User:ANONYMOUS,/127.0.0.1
),null,142764461178823,ListenerName(PLAINTEXT),PLAINTEXT),Some(org.apache.kafka.common.network.NetworkSend@54b50f3c
),SendAction)
13:52:41.025 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-sent
13:52:41.027 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-received
13:52:41.028 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.latency
13:52:41.029 [ller-0-to-broker-0-send-thread] TRACE [
 NetworkClient] - Completed receive from node 0, for key 6, received
{error_code=0}
13:52:41.032 [-client-thread | adminclient-1] DEBUG [
Selector] - Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at
org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at
org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
    at
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:359)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
    at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:927)

And finally the TimeoutException:

13:54:40.964 [-client-thread | adminclient-1] DEBUG [
 KafkaAdminClient$Call] - Call(callName=createTopics,
deadlineMs=1507053280963) timed out at 1507053280964 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting for a node
assignment.
    at
org.apache.kafka.clients.admin.KafkaAdminClient$Call.fail(KafkaAdminClient.java:475)
    at
org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.handleTimeouts(KafkaAdminClient.java:591)
    at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.timeoutNewCalls(KafkaAdminClient.java:663)
    at
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:908)
    at java.lang.Thread.run(Thread.java:748)


Full trace:

13:54:40.966 [                          main] ERROR [
TestContextManager] - Caught exception while allowing TestExecutionListener
[org.springframework.test.context.support.DependencyInjectionTestExecutionListener@72ccd81a]
to prepare test instance
[info.rmapproject.core.model.impl.openrdf.ORMapEventCreationTest@6d8792db]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at
org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at
org.springframework.test.context.support.DefaultTestContext.getApplicationContext(DefaultTestContext.java:107)
    at
org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:117)
    at
org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:83)
    at
org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:242)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:227)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:246)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
    at
org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at
org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
    at
org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at
org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
    at
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:369)
    at
org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:275)
    at
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:239)
    at
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:160)
    at
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:373)
    at
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:334)
    at
org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:119)
    at
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:407)
Caused by: org.springframework.beans.factory.BeanCreationException: Error
creating bean with name 'kafkaEmbedded': Invocation of init method failed;
nested exception is java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
node assignment.
    at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1702)
    at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:414)
    at
org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer.customizeContext(EmbeddedKafkaContextCustomizer.java:76)
    at
org.springframework.test.context.support.AbstractContextLoader.customizeContext(AbstractContextLoader.java:187)
    at
org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:127)
    at
org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:60)
    at
org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.delegateLoading(AbstractDelegatingSmartContextLoader.java:109)
    at
org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.loadContext(AbstractDelegatingSmartContextLoader.java:246)
    at
org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at
org.springframework.test.context.cache.DefaultCacheAwareContextLoaderDelegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 29 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
node assignment.
    at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
    at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
    at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
    at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:213)
    at
org.springframework.kafka.test.rule.KafkaEmbedded.before(KafkaEmbedded.java:293)
    at
org.springframework.kafka.test.rule.KafkaEmbedded.afterPropertiesSet(KafkaEmbedded.java:218)
    at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1761)
    at
org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1698)
    ... 38 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out
waiting for a node assignment.



On Tue, Oct 3, 2017 at 3:35 PM, Elliot Metsger <em...@gmail.com> wrote:

> Hi everyone,
>
> New user to Kafka, and am psyched to get it working in our environment!
> I'm on 0.11.0.0, and using spring-kafka 2.0.0.rc1.  I'm struggling to get
> things working in our unit/integration tests.
>
> I don't want to overload everyone with the context of the issues, but this
> is the stacktrace I'm seeing (from the tail end of the Maven build):
> https://gist.github.com/emetsger/05ef47ce15ca54b2735a65627992a2d0
>
> The full build output is here: https://goo.gl/pwM8Lw
>
> It appears that the AdminClient times out trying to create the topic for
> the unit test:
>
> 13:54:40.964 [-client-thread | adminclient-1] DEBUG [
>  KafkaAdminClient$Call] - Call(callName=createTopics,
> deadlineMs=1507053280963) timed out at 1507053280964 after 1 attempt(s)
> java.lang.Exception: TimeoutException: Timed out waiting for a node
> assignment.
> at org.apache.kafka.clients.admin.KafkaAdminClient$Call.
> fail(KafkaAdminClient.java:475)
> at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.
> handleTimeouts(KafkaAdminClient.java:591)
> at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.
> timeoutNewCalls(KafkaAdminClient.java:663)
> at org.apache.kafka.clients.admin.KafkaAdminClient$
> AdminClientRunnable.run(KafkaAdminClient.java:908)
> at java.lang.Thread.run(Thread.java:748)
>
> The unit test uses the maven-buildhelper-plugin to reserve a port for the
> Kakfa broker, in this case, port 55383.  If you look at the gist, I've
> added the properties used by the AdminClient and the Broker, and it seems
> that they are configured properly (that is, the AC is configured to talk to
> 127.0.0.1:55383, which is where the broker is listening).
>
> Another relevant note is that I'm using spring-kafka, and Spring is
> managing all the beans related to Kafka: the producer, consumer, and
> broker.  The broker is being started using the @EmbeddedKafka annotation,
> which will create an embedded instance of Zookeeper, and an EmbeddedKafka
> instance:
>
> @TestPropertySource(locations = { "classpath:/rmapcore.properties",
> "classpath:/kafka-broker.properties" })
> @EmbeddedKafka(topics = { "rmap-event-topic" }, brokerProperties = {
> "log.dir=${kafka.broker.logs-dir}", "port=${kafka.broker.port}" })
>
> Kafka should be automatically creating the topic (`rmap-event-topic` in
> this case), and it looks like the AdminClient tries, but ultimately fails,
> timing out trying to connect to the broker:
>
> 15:21:34.977 [                          main] DEBUG [KafkaAdminClient$AdminClientRu]
> - adminclient-1: queueing Call(callName=createTopics,
> deadlineMs=1507058614976) with a timeout 120000 ms from now.
> ...
> 15:21:35.047 [-client-thread | adminclient-1] TRACE [
>  NetworkClient] - Found least loaded node 127.0.0.1:58230 (id: -1 rack:
> null)
> 15:21:35.047 [-client-thread | adminclient-1] DEBUG [NetworkClient$DefaultMetadataU]
> - Initialize connection to node -1 for sending metadata request
> 15:21:35.047 [-client-thread | adminclient-1] DEBUG [
>  NetworkClient] - Initiating connection to node -1 at 127.0.0.1:58230.
> 15:21:35.048 [-client-thread | adminclient-1] DEBUG [
>  Metrics] - Added sensor with name node--1.bytes-sent
> 15:21:35.048 [-client-thread | adminclient-1] DEBUG [
>  Metrics] - Added sensor with name node--1.bytes-received
> 15:21:35.049 [-client-thread | adminclient-1] DEBUG [
>  Metrics] - Added sensor with name node--1.latency
> 15:21:35.055 [-client-thread | adminclient-1] DEBUG [
> Selector] - Connection with /127.0.0.1 disconnected
> java.net.ConnectException: Connection refused
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
> at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(
> PlaintextTransportLayer.java:50)
> at org.apache.kafka.common.network.KafkaChannel.
> finishConnect(KafkaChannel.java:95)
> at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
> java:359)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
> at org.apache.kafka.clients.admin.KafkaAdminClient$
> AdminClientRunnable.run(KafkaAdminClient.java:927)
> at java.lang.Thread.run(Thread.java:748)
>
> Polling continues, and ultimately times out:
>
> 15:23:34.981 [-client-thread | adminclient-1] DEBUG [
>  KafkaAdminClient$Call] - Call(callName=createTopics,
> deadlineMs=1507058614976) timed out at 1507058614981 after 1 attempt(s)
> java.lang.Exception: TimeoutException: Timed out waiting for a node
> assignment.
> at org.apache.kafka.clients.admin.KafkaAdminClient$Call.
> fail(KafkaAdminClient.java:475)
> at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.
> handleTimeouts(KafkaAdminClient.java:591)
> at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.
> timeoutNewCalls(KafkaAdminClient.java:663)
> at org.apache.kafka.clients.admin.KafkaAdminClient$
> AdminClientRunnable.run(KafkaAdminClient.java:908)
> at java.lang.Thread.run(Thread.java:748)
>
>
> Any help or advice is appreciated!
>
> Thanks,
> Elliot
>
> Feel free to follow the steps to reproduce locally, or just look at the
> full build output here https://goo.gl/pwM8Lw
> 1. Because I'm using Spring release candidates in the build, you'll need
> to add two repositories to your ~/.m2/settings.xml:
>     <repositories>
>         <repository>
>           <id>spring-libs-release</id>
>           <name>Spring Releases</name>
>           <url>https://repo.spring.io/libs-release</url>
>           <snapshots>
>             <enabled>false</enabled>
>           </snapshots>
>         </repository>
>
>         <repository>
>           <id>spring-libs-snap</id>
>           <name>Spring Snapshots</name>
>           <url>https://repo.spring.io/libs-snapshot</url>
>           <snapshots>
>             <enabled>true</enabled>
>           </snapshots>
>         </repository>
>     </repositories>
>
> 2. Check out https://github.com/emetsger/spring-kafka, branch
> property-placeholder-resolution.  Run `./gradlew clean install` from
> inside the spring-kafka directory.
> - this is the same as spring-kafka 2.0.0.rc1, but allows for the use of
> property placeholders in the `brokerProperties` attribute of @EmbeddedKafka
> (PR pending)
>
> 3. Check out https://github.com/emetsger/rmap.git,
> branch kakfa-adminclient-timeout in a new directory.
>
> 4.  Run mvn clean install -Dcheckstyle.skip -Djavadoc.skip -DskipTests
> from the RMap directory.  The build will fail at the "Solr indexing"
> (rmap-indexing-solr) module.  That's OK, ignore it. This is just to
> populate your local Maven repository with the necessary artifacts (and
> `rmap-indexing-solr` isn't necessary for reproducing this issue).
>
> 5. Now that all the dependencies are installed, cd into the `core`
> directory and run an example unit test to replicate the problem: `mvn test
> -Dlogback.configurationFile=src/test/resources/logback-test.xml
> -Dtest=ORMapEventCreationTest`
>
>
>

Fwd: TimeoutException: Timed out waiting for a node assignment. when AdminClient attempts to create topics

Posted by Elliot Metsger <em...@gmail.com>.
(Updated, and x-posted from dev@)

Hi everyone,

New user to Kafka, and am psyched to get it working in our environment!
I'm on 0.11.0.0, and using spring-kafka 2.0.0.rc1.  I'm struggling to get
things working in our unit/integration tests.

I don't want to overload everyone with the context of the issues, but this
is the stacktrace I'm seeing (from the tail end of the Maven build):
https://gist.github.com/emetsger/05ef47ce15ca54b2735a65627992a2d0

The full build output is here: https://goo.gl/pwM8Lw

It appears that the AdminClient times out trying to create the topic for
the unit test:

13:54:40.964 [-client-thread | adminclient-1] DEBUG [
 KafkaAdminClient$Call] - Call(callName=createTopics,
deadlineMs=1507053280963) timed out at 1507053280964 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting for a node
assignment.
at org.apache.kafka.clients.admin.KafkaAdminClient$Call.
fail(KafkaAdminClient.java:475)
at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.
handleTimeouts(KafkaAdminClient.java:591)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.
timeoutNewCalls(KafkaAdminClient.java:663)
at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(
KafkaAdminClient.java:908)
at java.lang.Thread.run(Thread.java:748)

The unit test uses the maven-buildhelper-plugin to reserve a port for the
Kakfa broker, in this case, port 55383.  If you look at the gist, I've
added the properties used by the AdminClient and the Broker, and it seems
that they are configured properly (that is, the AC is configured to talk to
127.0.0.1:55383, which is where the broker is listening).

Another relevant note is that I'm using spring-kafka, and Spring is
managing all the beans related to Kafka: the producer, consumer, and
broker.  The broker is being started using the @EmbeddedKafka annotation,
which will create an embedded instance of Zookeeper, and an EmbeddedKafka
instance:

@TestPropertySource(locations = { "classpath:/rmapcore.properties",
"classpath:/kafka-broker.properties" })
@EmbeddedKafka(topics = { "rmap-event-topic" }, brokerProperties = {
"log.dir=${kafka.broker.logs-dir}", "port=${kafka.broker.port}" })

Kafka should be automatically creating the topic (`rmap-event-topic` in
this case), and it looks like the AdminClient tries, but ultimately fails,
timing out trying to connect to the broker:

Queuing the call:
13:52:40.968 [                          main] DEBUG
[KafkaAdminClient$AdminClientRu]
- adminclient-1: queueing Call(callName=createTopics,
deadlineMs=1507053280963) with a timeout 120000 ms from now.

Encountering connection refused:
13:52:41.024 [-client-thread | adminclient-1] TRACE [
 NetworkClient] - Found least loaded node 127.0.0.1:55383 (id: -1 rack:
null)
13:52:41.024 [-client-thread | adminclient-1] DEBUG
[NetworkClient$DefaultMetadataU]
- Initialize connection to node -1 for sending metadata request
13:52:41.025 [-client-thread | adminclient-1] DEBUG [
 NetworkClient] - Initiating connection to node -1 at 127.0.0.1:55383.
13:52:41.024 [nerName(PLAINTEXT)-PLAINTEXT-0] TRACE [
 Logging$class] - Socket server received response to send, registering for
write and sending data: Response(Request(0,127.0.0.1:55388-127.0.0.1:55390
,Session(User:ANONYMOUS,/127.0.0.1),null,142764461178823,
ListenerName(PLAINTEXT),PLAINTEXT),Some(org.apache.kafka.common.network.
NetworkSend@54b50f3c),SendAction)
13:52:41.025 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-sent
13:52:41.027 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.bytes-received
13:52:41.028 [-client-thread | adminclient-1] DEBUG [
 Metrics] - Added sensor with name node--1.latency
13:52:41.029 [ller-0-to-broker-0-send-thread] TRACE [
 NetworkClient] - Completed receive from node 0, for key 6, received
{error_code=0}
13:52:41.032 [-client-thread | adminclient-1] DEBUG [
Selector] - Connection with /127.0.0.1 disconnected
java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(
SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.
finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.
finishConnect(KafkaChannel.java:95)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.
java:359)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:326)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:432)
    at org.apache.kafka.clients.admin.KafkaAdminClient$
AdminClientRunnable.run(KafkaAdminClient.java:927)

Polling continues, and finally times out:

13:54:40.964 [-client-thread | adminclient-1] DEBUG [
 KafkaAdminClient$Call] - Call(callName=createTopics,
deadlineMs=1507053280963) timed out at 1507053280964 after 1 attempt(s)
java.lang.Exception: TimeoutException: Timed out waiting for a node
assignment.
    at org.apache.kafka.clients.admin.KafkaAdminClient$Call.
fail(KafkaAdminClient.java:475)
    at org.apache.kafka.clients.admin.KafkaAdminClient$TimeoutProcessor.
handleTimeouts(KafkaAdminClient.java:591)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.
timeoutNewCalls(KafkaAdminClient.java:663)
    at org.apache.kafka.clients.admin.KafkaAdminClient$
AdminClientRunnable.run(KafkaAdminClient.java:908)
    at java.lang.Thread.run(Thread.java:748)


Full trace:

13:54:40.966 [                          main] ERROR [
TestContextManager] - Caught exception while allowing TestExecutionListener
[org.springframework.test.context.support.DependencyInjectionTestExecuti
onListener@72ccd81a] to prepare test instance [info.rmapproject.core.model.
impl.openrdf.ORMapEventCreationTest@6d8792db]
java.lang.IllegalStateException: Failed to load ApplicationContext
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoader
Delegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:125)
    at org.springframework.test.context.support.DefaultTestContext.
getApplicationContext(DefaultTestContext.java:107)
    at org.springframework.test.context.support.
DependencyInjectionTestExecutionListener.injectDependencies(
DependencyInjectionTestExecutionListener.java:117)
    at org.springframework.test.context.support.
DependencyInjectionTestExecutionListener.prepareTestInstance(
DependencyInjectionTestExecutionListener.java:83)
    at org.springframework.test.context.TestContextManager.
prepareTestInstance(TestContextManager.java:242)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.
createTest(SpringJUnit4ClassRunner.java:227)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.
runReflectiveCall(SpringJUnit4ClassRunner.java:289)
    at org.junit.internal.runners.model.ReflectiveCallable.run(
ReflectiveCallable.java:12)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.
methodBlock(SpringJUnit4ClassRunner.java:291)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.
runChild(SpringJUnit4ClassRunner.java:246)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.
runChild(SpringJUnit4ClassRunner.java:97)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.junit.internal.runners.statements.RunBefores.
evaluate(RunBefores.java:26)
    at org.springframework.test.context.junit4.statements.
RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.junit.internal.runners.statements.RunAfters.evaluate(
RunAfters.java:27)
    at org.springframework.test.context.junit4.statements.
RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(
SpringJUnit4ClassRunner.java:190)
    at org.apache.maven.surefire.junit4.JUnit4Provider.execute(
JUnit4Provider.java:369)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(
JUnit4Provider.java:275)
    at org.apache.maven.surefire.junit4.JUnit4Provider.
executeTestSet(JUnit4Provider.java:239)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(
JUnit4Provider.java:160)
    at org.apache.maven.surefire.booter.ForkedBooter.
invokeProviderInSameClassLoader(ForkedBooter.java:373)
    at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(
ForkedBooter.java:334)
    at org.apache.maven.surefire.booter.ForkedBooter.execute(
ForkedBooter.java:119)
    at org.apache.maven.surefire.booter.ForkedBooter.main(
ForkedBooter.java:407)
Caused by: org.springframework.beans.factory.BeanCreationException: Error
creating bean with name 'kafkaEmbedded': Invocation of init method failed;
nested exception is java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a
node assignment.
    at org.springframework.beans.factory.support.
AbstractAutowireCapableBeanFactory.initializeBean(
AbstractAutowireCapableBeanFactory.java:1702)
    at org.springframework.beans.factory.support.
AbstractAutowireCapableBeanFactory.initializeBean(
AbstractAutowireCapableBeanFactory.java:414)
    at org.springframework.kafka.test.context.EmbeddedKafkaContextCustomizer
.customizeContext(EmbeddedKafkaContextCustomizer.java:76)
    at org.springframework.test.context.support.AbstractContextLoader.
customizeContext(AbstractContextLoader.java:187)
    at org.springframework.test.context.support.
AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.
java:127)
    at org.springframework.test.context.support.
AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.
java:60)
    at org.springframework.test.context.support.
AbstractDelegatingSmartContextLoader.delegateLoading(
AbstractDelegatingSmartContextLoader.java:109)
    at org.springframework.test.context.support.
AbstractDelegatingSmartContextLoader.loadContext(
AbstractDelegatingSmartContextLoader.java:246)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoader
Delegate.loadContextInternal(DefaultCacheAwareContextLoaderDelegate.java:99)
    at org.springframework.test.context.cache.DefaultCacheAwareContextLoader
Delegate.loadContext(DefaultCacheAwareContextLoaderDelegate.java:117)
    ... 29 common frames omitted
Caused by: java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.TimeoutException:
Timed out waiting for a node assignment.
    at org.apache.kafka.common.internals.KafkaFutureImpl.
wrapAndThrow(KafkaFutureImpl.java:45)
    at org.apache.kafka.common.internals.KafkaFutureImpl.
access$000(KafkaFutureImpl.java:32)
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(
KafkaFutureImpl.java:89)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(
KafkaFutureImpl.java:213)
    at org.springframework.kafka.test.rule.KafkaEmbedded.
before(KafkaEmbedded.java:293)
    at org.springframework.kafka.test.rule.KafkaEmbedded.afterPropertiesSet(
KafkaEmbedded.java:218)
    at org.springframework.beans.factory.support.
AbstractAutowireCapableBeanFactory.invokeInitMethods(
AbstractAutowireCapableBeanFactory.java:1761)
    at org.springframework.beans.factory.support.
AbstractAutowireCapableBeanFactory.initializeBean(
AbstractAutowireCapableBeanFactory.java:1698)
    ... 38 common frames omitted
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out
waiting for a node assignment.

Any help or advice is appreciated!

Thanks,
Elliot

Feel free to follow the steps to reproduce locally, or just look at the
full build output here https://goo.gl/pwM8Lw
1. Because I'm using Spring release candidates in the build, you'll need to
add two repositories to your ~/.m2/settings.xml:
    <repositories>
        <repository>
          <id>spring-libs-release</id>
          <name>Spring Releases</name>
          <url>https://repo.spring.io/libs-release</url>
          <snapshots>
            <enabled>false</enabled>
          </snapshots>
        </repository>

        <repository>
          <id>spring-libs-snap</id>
          <name>Spring Snapshots</name>
          <url>https://repo.spring.io/libs-snapshot</url>
          <snapshots>
            <enabled>true</enabled>
          </snapshots>
        </repository>
    </repositories>

2. Check out https://github.com/emetsger/spring-kafka, branch
property-placeholder-resolution.  Run `./gradlew clean install` from inside
the spring-kafka directory.
- this is the same as spring-kafka 2.0.0.rc1, but allows for the use of
property placeholders in the `brokerProperties` attribute of @EmbeddedKafka
(PR pending)

3. Check out https://github.com/emetsger/rmap.git, branch
kakfa-adminclient-timeout
in a new directory.

4.  Run mvn clean install -Dcheckstyle.skip -Djavadoc.skip -DskipTests from
the RMap directory.  The build will fail at the "Solr indexing"
(rmap-indexing-solr) module.  That's OK, ignore it. This is just to
populate your local Maven repository with the necessary artifacts (and
`rmap-indexing-solr` isn't necessary for reproducing this issue).

5. Now that all the dependencies are installed, cd into the `core`
directory and run an example unit test to replicate the problem: `mvn test
-Dlogback.configurationFile=src/test/resources/logback-test.xml
-Dtest=ORMapEventCreationTest`