You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Brem, Robert" <Ro...@adesso.ch> on 2016/08/12 13:22:49 UTC

consumer.poll() hangs indefinitely in docker container

Hy I'm new to Kafka and messaging at all.

I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return.
telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.

In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ...

Can you give me a hint, in which direction I should look?
Thanks!

That's the output on the host, with the working application:

15:03:42,657 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values:
                metric.reporters = []
                metadata.max.age.ms = 300000
                partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
                reconnect.backoff.ms = 50
                sasl.kerberos.ticket.renew.window.factor = 0.8
                max.partition.fetch.bytes = 1048576
                bootstrap.servers = [172.17.0.1:9092]
                ssl.keystore.type = JKS
                enable.auto.commit = true
                sasl.mechanism = GSSAPI
                interceptor.classes = null
                exclude.internal.topics = true
                ssl.truststore.password = null
                client.id = consumer-1
                ssl.endpoint.identification.algorithm = null
                max.poll.records = 2147483647
                check.crcs = true
                request.timeout.ms = 40000
                heartbeat.interval.ms = 3000
                auto.commit.interval.ms = 5000
                receive.buffer.bytes = 65536
                ssl.truststore.type = JKS
                ssl.truststore.location = null
                ssl.keystore.password = null
                fetch.min.bytes = 1
                send.buffer.bytes = 131072
                value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
                group.id = pokertracker
                retry.backoff.ms = 100
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                ssl.trustmanager.algorithm = PKIX
                ssl.key.password = null
                fetch.max.wait.ms = 500
                sasl.kerberos.min.time.before.relogin = 60000
                connections.max.idle.ms = 540000
                session.timeout.ms = 10000
                metrics.num.samples = 2
                key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
                ssl.protocol = TLS
                ssl.provider = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.keystore.location = null
                ssl.cipher.suites = null
                security.protocol = PLAINTEXT
                ssl.keymanager.algorithm = SunX509
                metrics.sample.window.ms = 30000
                auto.offset.reset = latest

15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
15:03:42,681 WARN  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
                at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
                at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
                at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)
                at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
                at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)
                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
                at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)
                at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)
                at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)
                at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)
                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
                at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)
                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)
                at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)
                at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)
                at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
                at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
                at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
                at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)
                at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
                at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)
                at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)
                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
                at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
                at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
                at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
                at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
                at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)
                at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
                at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)
                at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)
                at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)
                at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)
                at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)
                at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
                at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
                at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
                at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)
                at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)
                at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)
                at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)
                at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)
                at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)
                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
                at java.lang.Thread.run(Thread.java:745)
                at org.jboss.threads.JBossThread.run(JBossThread.java:320)

15:03:42,815 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
15:03:43,221 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
15:03:43,223 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker
15:03:43,224 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
15:03:43,246 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1
15:03:43,247 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker


And here Is the log output from the docker container:

13:13:33,627 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values:
                metric.reporters = []
                metadata.max.age.ms = 300000
                partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
                reconnect.backoff.ms = 50
                sasl.kerberos.ticket.renew.window.factor = 0.8
                max.partition.fetch.bytes = 1048576
                bootstrap.servers = [172.17.0.1:9092]
                ssl.keystore.type = JKS
                enable.auto.commit = true
                sasl.mechanism = GSSAPI
                interceptor.classes = null
                exclude.internal.topics = true
                ssl.truststore.password = null
                client.id = consumer-1
                ssl.endpoint.identification.algorithm = null
                max.poll.records = 2147483647
                check.crcs = true
                request.timeout.ms = 40000
                heartbeat.interval.ms = 3000
                auto.commit.interval.ms = 5000
                receive.buffer.bytes = 65536
                ssl.truststore.type = JKS
                ssl.truststore.location = null
                ssl.keystore.password = null
                fetch.min.bytes = 1
                send.buffer.bytes = 131072
                value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
                group.id = pokertracker
                retry.backoff.ms = 100
                sasl.kerberos.kinit.cmd = /usr/bin/kinit
                sasl.kerberos.service.name = null
                sasl.kerberos.ticket.renew.jitter = 0.05
                ssl.trustmanager.algorithm = PKIX
                ssl.key.password = null
                fetch.max.wait.ms = 500
                sasl.kerberos.min.time.before.relogin = 60000
                connections.max.idle.ms = 540000
                session.timeout.ms = 10000
                metrics.num.samples = 2
                key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
                ssl.protocol = TLS
                ssl.provider = null
                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
                ssl.keystore.location = null
                ssl.cipher.suites = null
                security.protocol = PLAINTEXT
                ssl.keymanager.algorithm = SunX509
                metrics.sample.window.ms = 30000
                auto.offset.reset = latest

13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
13:13:33,711 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
13:13:33,948 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
13:13:34,009 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker


Here is the java code:

@Startup
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
@Singleton
public class InMemoryCache {

    @Inject
    KafkaConsumer<String, String> consumer;

    @Dedicated
    @Inject
    ExecutorService kafka;

    ...

    @PostConstruct
    public void onInit() {
...
        CompletableFuture
                .runAsync(this::handleKafkaEvent, kafka);
    }

    public void handleKafkaEvent() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(200);
            for (ConsumerRecord<String, String> record : records) {
                switch (record.topic()) {
                    case KafkaProvider.TOPIC:
                        System.out.println("record.value() = " + record.value());
                        List<CoreEvent> events = converter.convertToEvents(record.value());
                        for (CoreEvent event : events) {
                            handle(event);
                        }
                        break;
                    default:
                        throw new IllegalArgumentException("Illegal message type: ");
                }
            }
        }
    }

...


Re: consumer.poll() hangs indefinitely in docker container

Posted by Radoslaw Gruchalski <ra...@gruchalski.com>.
Is there a JIRA for it? Could you point to where the issue exists in the
code?

–
Best regards,
Radek Gruchalski
radek@gruchalski.com


On August 12, 2016 at 5:15:33 PM, Oleg Zhurakousky (
ozhurakousky@hortonworks.com) wrote:

It hangs indefinitely in any container. It’s a known issue and has been
brought up many times on this list, yet there is not fix for it.
The problem is with the fact that while poll() attempts to create an
elusion that it is async and even allows you to set a timeout it is
essentially very misleading if you look inside its implementation. The
first call it makes is to fetch topic metadata. That call is not part of
the Future it returns so if connection to broker is not available you’re
dead since Kafka attempts to reconnect and there s no property to set
reconnect attempts, so it attempts to reconnect indefinitely.

Cheers
Oleg

> On Aug 12, 2016, at 9:22 AM, Brem, Robert <Ro...@adesso.ch> wrote:
>
> Hy I'm new to Kafka and messaging at all.
>
> I have a simple java application that contains a consumer and a producer.
It is working on the host system but if I try to run it in a docker
container (Kafka is not in the container, it is still on the host)
consumer.poll() hangs up and does not return.
> telnet tells me that inside the container the host:port 172.17.0.1:9092
is open.
>
> In the docker container on startup Kafka tells me: Marking the
coordinator ... dead for group ...
>
> Can you give me a hint, in which direction I should look?
> Thanks!
>
> That's the output on the host, with the working application:
>
> 15:03:42,657 INFO [org.apache.kafka.clients.consumer.ConsumerConfig]
(ServerService Thread Pool -- 63) ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [172.17.0.1:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = consumer-1
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> group.id = pokertracker
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 10000
> metrics.num.samples = 2
> key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = latest
>
> 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
> 15:03:42,680 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
> 15:03:42,681 WARN [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 63) Error registering AppInfo mbean:
javax.management.InstanceAlreadyExistsException:
kafka.consumer:type=app-info,id=consumer-1
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)

> at
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)

> at
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)

> at
org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)

> at
org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)

> at
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)

> at
org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)

> at
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)

> at
com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)

> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

> at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

> at java.lang.reflect.Method.invoke(Method.java:498)
> at
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)

> at
org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)

> at
org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)

> at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
> at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)

> at
org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)

> at
org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)

> at
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)

> at
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)

> at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
> at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
> at
org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)

> at
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)

> at
org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)

> at
org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)

> at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
> at
org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)

> at
org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)

> at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
> at
org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)

> at
org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)

> at
org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)

> at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
> at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
> at
org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)

> at
org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)

> at
org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)

> at
org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)

> at
org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)

> at
org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)

> at
org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)

> at
org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
> at
org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)

> at
org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)

> at
org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)

> at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)

> at
org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)

> at
org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)

> at
org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)

> at
org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)

> at
org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)

> at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

> at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

> at java.lang.Thread.run(Thread.java:745)
> at org.jboss.threads.JBossThread.run(JBossThread.java:320)
>
> 15:03:42,815 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1)
Re-preparing already prepared query select DATA, VERSION from EVENTS where
NAME = :name allow filtering. Please note that preparing the same query
more than once is generally an anti-pattern and will likely affect
performance. Consider preparing the statement only once.
> 15:03:43,221 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator
contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 15:03:43,223 INFO
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned
partitions [] for group pokertracker
> 15:03:43,224 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
> 15:03:43,246 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Successfully joined group
pokertracker with generation 1
> 15:03:43,247 INFO
[org.apache.kafka.clients.consumer.internals.ConsumerCoordinator]
(EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned
partitions [test-0] for group pokertracker
>
>
> And here Is the log output from the docker container:
>
> 13:13:33,627 INFO [org.apache.kafka.clients.consumer.ConsumerConfig]
(ServerService Thread Pool -- 61) ConsumerConfig values:
> metric.reporters = []
> metadata.max.age.ms = 300000
> partition.assignment.strategy =
[org.apache.kafka.clients.consumer.RangeAssignor]
> reconnect.backoff.ms = 50
> sasl.kerberos.ticket.renew.window.factor = 0.8
> max.partition.fetch.bytes = 1048576
> bootstrap.servers = [172.17.0.1:9092]
> ssl.keystore.type = JKS
> enable.auto.commit = true
> sasl.mechanism = GSSAPI
> interceptor.classes = null
> exclude.internal.topics = true
> ssl.truststore.password = null
> client.id = consumer-1
> ssl.endpoint.identification.algorithm = null
> max.poll.records = 2147483647
> check.crcs = true
> request.timeout.ms = 40000
> heartbeat.interval.ms = 3000
> auto.commit.interval.ms = 5000
> receive.buffer.bytes = 65536
> ssl.truststore.type = JKS
> ssl.truststore.location = null
> ssl.keystore.password = null
> fetch.min.bytes = 1
> send.buffer.bytes = 131072
> value.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> group.id = pokertracker
> retry.backoff.ms = 100
> sasl.kerberos.kinit.cmd = /usr/bin/kinit
> sasl.kerberos.service.name = null
> sasl.kerberos.ticket.renew.jitter = 0.05
> ssl.trustmanager.algorithm = PKIX
> ssl.key.password = null
> fetch.max.wait.ms = 500
> sasl.kerberos.min.time.before.relogin = 60000
> connections.max.idle.ms = 540000
> session.timeout.ms = 10000
> metrics.num.samples = 2
> key.deserializer = class
org.apache.kafka.common.serialization.StringDeserializer
> ssl.protocol = TLS
> ssl.provider = null
> ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
> ssl.keystore.location = null
> ssl.cipher.suites = null
> security.protocol = PLAINTEXT
> ssl.keymanager.algorithm = SunX509
> metrics.sample.window.ms = 30000
> auto.offset.reset = latest
>
> 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
> 13:13:33,650 INFO [org.apache.kafka.common.utils.AppInfoParser]
(ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
> 13:13:33,711 WARN [com.datastax.driver.core.Cluster] (cluster1-worker-1)
Re-preparing already prepared query select DATA, VERSION from EVENTS where
NAME = :name allow filtering. Please note that preparing the same query
more than once is generally an anti-pattern and will likely affect
performance. Consider preparing the statement only once.
> 13:13:33,948 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator
contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 13:13:34,009 INFO
[org.apache.kafka.clients.consumer.internals.AbstractCoordinator]
(EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator
contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker
>
>
> Here is the java code:
>
> @Startup
> @ConcurrencyManagement(ConcurrencyManagementType.BEAN)
> @Singleton
> public class InMemoryCache {
>
> @Inject
> KafkaConsumer<String, String> consumer;
>
> @Dedicated
> @Inject
> ExecutorService kafka;
>
> ...
>
> @PostConstruct
> public void onInit() {
> ...
> CompletableFuture
> .runAsync(this::handleKafkaEvent, kafka);
> }
>
> public void handleKafkaEvent() {
> while (true) {
> ConsumerRecords<String, String> records = consumer.poll(200);
> for (ConsumerRecord<String, String> record : records) {
> switch (record.topic()) {
> case KafkaProvider.TOPIC:
> System.out.println("record.value() = " + record.value());
> List<CoreEvent> events = converter.convertToEvents(record.value());
> for (CoreEvent event : events) {
> handle(event);
> }
> break;
> default:
> throw new IllegalArgumentException("Illegal message type: ");
> }
> }
> }
> }
>
> ...
>

Re: consumer.poll() hangs indefinitely in docker container

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
It is accurate since it’s an API/implementation problem and therefore container independent. Sure if everything is configured correctly and broker is accessible then things do work, but try to shut down consumer when broker is not accessible. And when I mean shut down I am not implying shutting down the JVM via System.exit(), I am simply saying that poll() will block indefinitely and so will close().

In fact it’s a very common problem in Kafka components (see below)

https://issues.apache.org/jira/browse/KAFKA-3540
https://issues.apache.org/jira/browse/KAFKA-1894
https://issues.apache.org/jira/browse/KAFKA-3539

Cheers
Oleg

On Aug 16, 2016, at 5:06 AM, Jaikiran Pai <ja...@gmail.com>> wrote:

On Friday 12 August 2016 08:45 PM, Oleg Zhurakousky wrote:
It hangs indefinitely in any container.

I don't think that's accurate. We have been running Kafka brokers and consumers/producers in docker containers for a while now and they are functional. Of course, you need to make sure you use the IP addresses instead of localhost/127.0.0.1 to make sure that the brokers are accessible to the consumers/producers and you don't run into the situation that you explain about the broker connection not happening successfully.

By the way, I am not saying that the consumer.poll() doesn't have that issue you state.

-Jaikiran


 It’s a known issue and has been brought up many times on this list, yet there is not fix for it.
The problem is with the fact that while poll() attempts to create an elusion that it is async and even allows you to set a timeout it is essentially very misleading if you look inside its implementation. The first call it makes is to fetch topic metadata. That call is not part of the Future it returns so if connection to broker is not available you’re dead since Kafka attempts to reconnect and there s no property to set reconnect attempts, so it attempts to reconnect indefinitely.

Cheers
Oleg

On Aug 12, 2016, at 9:22 AM, Brem, Robert <Ro...@adesso.ch>> wrote:

Hy I'm new to Kafka and messaging at all.

I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return.
telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.

In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ...

Can you give me a hint, in which direction I should look?
Thanks!

That's the output on the host, with the working application:

15:03:42,657 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values:
               metric.reporters = []
               metadata.max.age.ms = 300000
               partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
               reconnect.backoff.ms = 50
               sasl.kerberos.ticket.renew.window.factor = 0.8
               max.partition.fetch.bytes = 1048576
               bootstrap.servers = [172.17.0.1:9092]
               ssl.keystore.type = JKS
               enable.auto.commit = true
               sasl.mechanism = GSSAPI
               interceptor.classes = null
               exclude.internal.topics = true
               ssl.truststore.password = null
               client.id = consumer-1
               ssl.endpoint.identification.algorithm = null
               max.poll.records = 2147483647
               check.crcs = true
               request.timeout.ms = 40000
               heartbeat.interval.ms = 3000
               auto.commit.interval.ms = 5000
               receive.buffer.bytes = 65536
               ssl.truststore.type = JKS
               ssl.truststore.location = null
               ssl.keystore.password = null
               fetch.min.bytes = 1
               send.buffer.bytes = 131072
               value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
               group.id = pokertracker
               retry.backoff.ms = 100
               sasl.kerberos.kinit.cmd = /usr/bin/kinit
               sasl.kerberos.service.name = null
               sasl.kerberos.ticket.renew.jitter = 0.05
               ssl.trustmanager.algorithm = PKIX
               ssl.key.password = null
               fetch.max.wait.ms = 500
               sasl.kerberos.min.time.before.relogin = 60000
               connections.max.idle.ms = 540000
               session.timeout.ms = 10000
               metrics.num.samples = 2
               key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
               ssl.protocol = TLS
               ssl.provider = null
               ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
               ssl.keystore.location = null
               ssl.cipher.suites = null
               security.protocol = PLAINTEXT
               ssl.keymanager.algorithm = SunX509
               metrics.sample.window.ms = 30000
               auto.offset.reset = latest

15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
15:03:42,681 WARN  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
               at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
               at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
               at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
               at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
               at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
               at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
               at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)
               at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
               at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
               at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)
               at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
               at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
               at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)
               at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)
               at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
               at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
               at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
               at java.lang.reflect.Method.invoke(Method.java:498)
               at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)
               at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)
               at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)
               at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
               at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
               at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
               at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)
               at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
               at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
               at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)
               at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)
               at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)
               at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
               at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
               at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
               at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
               at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
               at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
               at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
               at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
               at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)
               at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
               at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)
               at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)
               at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
               at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
               at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
               at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
               at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
               at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
               at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
               at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
               at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
               at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)
               at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
               at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)
               at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)
               at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)
               at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)
               at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)
               at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
               at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
               at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
               at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
               at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)
               at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)
               at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)
               at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)
               at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)
               at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)
               at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
               at java.util.concurrent.FutureTask.run(FutureTask.java:266)
               at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
               at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
               at java.lang.Thread.run(Thread.java:745)
               at org.jboss.threads.JBossThread.run(JBossThread.java:320)

15:03:42,815 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
15:03:43,221 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
15:03:43,223 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker
15:03:43,224 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
15:03:43,246 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1
15:03:43,247 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker


And here Is the log output from the docker container:

13:13:33,627 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values:
               metric.reporters = []
               metadata.max.age.ms = 300000
               partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
               reconnect.backoff.ms = 50
               sasl.kerberos.ticket.renew.window.factor = 0.8
               max.partition.fetch.bytes = 1048576
               bootstrap.servers = [172.17.0.1:9092]
               ssl.keystore.type = JKS
               enable.auto.commit = true
               sasl.mechanism = GSSAPI
               interceptor.classes = null
               exclude.internal.topics = true
               ssl.truststore.password = null
               client.id = consumer-1
               ssl.endpoint.identification.algorithm = null
               max.poll.records = 2147483647
               check.crcs = true
               request.timeout.ms = 40000
               heartbeat.interval.ms = 3000
               auto.commit.interval.ms = 5000
               receive.buffer.bytes = 65536
               ssl.truststore.type = JKS
               ssl.truststore.location = null
               ssl.keystore.password = null
               fetch.min.bytes = 1
               send.buffer.bytes = 131072
               value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
               group.id = pokertracker
               retry.backoff.ms = 100
               sasl.kerberos.kinit.cmd = /usr/bin/kinit
               sasl.kerberos.service.name = null
               sasl.kerberos.ticket.renew.jitter = 0.05
               ssl.trustmanager.algorithm = PKIX
               ssl.key.password = null
               fetch.max.wait.ms = 500
               sasl.kerberos.min.time.before.relogin = 60000
               connections.max.idle.ms = 540000
               session.timeout.ms = 10000
               metrics.num.samples = 2
               key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
               ssl.protocol = TLS
               ssl.provider = null
               ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
               ssl.keystore.location = null
               ssl.cipher.suites = null
               security.protocol = PLAINTEXT
               ssl.keymanager.algorithm = SunX509
               metrics.sample.window.ms = 30000
               auto.offset.reset = latest

13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
13:13:33,711 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
13:13:33,948 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
13:13:34,009 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker


Here is the java code:

@Startup
@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
@Singleton
public class InMemoryCache {

   @Inject
   KafkaConsumer<String, String> consumer;

   @Dedicated
   @Inject
   ExecutorService kafka;

   ...

   @PostConstruct
   public void onInit() {
...
       CompletableFuture
               .runAsync(this::handleKafkaEvent, kafka);
   }

   public void handleKafkaEvent() {
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(200);
           for (ConsumerRecord<String, String> record : records) {
               switch (record.topic()) {
                   case KafkaProvider.TOPIC:
                       System.out.println("record.value() = " + record.value());
                       List<CoreEvent> events = converter.convertToEvents(record.value());
                       for (CoreEvent event : events) {
                           handle(event);
                       }
                       break;
                   default:
                       throw new IllegalArgumentException("Illegal message type: ");
               }
           }
       }
   }

...





Re: consumer.poll() hangs indefinitely in docker container

Posted by Jaikiran Pai <ja...@gmail.com>.
On Friday 12 August 2016 08:45 PM, Oleg Zhurakousky wrote:
> It hangs indefinitely in any container.

I don't think that's accurate. We have been running Kafka brokers and 
consumers/producers in docker containers for a while now and they are 
functional. Of course, you need to make sure you use the IP addresses 
instead of localhost/127.0.0.1 to make sure that the brokers are 
accessible to the consumers/producers and you don't run into the 
situation that you explain about the broker connection not happening 
successfully.

By the way, I am not saying that the consumer.poll() doesn't have that 
issue you state.

-Jaikiran


>   It\u2019s a known issue and has been brought up many times on this list, yet there is not fix for it.
> The problem is with the fact that while poll() attempts to create an elusion that it is async and even allows you to set a timeout it is essentially very misleading if you look inside its implementation. The first call it makes is to fetch topic metadata. That call is not part of the Future it returns so if connection to broker is not available you\u2019re dead since Kafka attempts to reconnect and there s no property to set reconnect attempts, so it attempts to reconnect indefinitely.
>
> Cheers
> Oleg
>
>> On Aug 12, 2016, at 9:22 AM, Brem, Robert <Ro...@adesso.ch> wrote:
>>
>> Hy I'm new to Kafka and messaging at all.
>>
>> I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return.
>> telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.
>>
>> In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ...
>>
>> Can you give me a hint, in which direction I should look?
>> Thanks!
>>
>> That's the output on the host, with the working application:
>>
>> 15:03:42,657 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values:
>>                 metric.reporters = []
>>                 metadata.max.age.ms = 300000
>>                 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
>>                 reconnect.backoff.ms = 50
>>                 sasl.kerberos.ticket.renew.window.factor = 0.8
>>                 max.partition.fetch.bytes = 1048576
>>                 bootstrap.servers = [172.17.0.1:9092]
>>                 ssl.keystore.type = JKS
>>                 enable.auto.commit = true
>>                 sasl.mechanism = GSSAPI
>>                 interceptor.classes = null
>>                 exclude.internal.topics = true
>>                 ssl.truststore.password = null
>>                 client.id = consumer-1
>>                 ssl.endpoint.identification.algorithm = null
>>                 max.poll.records = 2147483647
>>                 check.crcs = true
>>                 request.timeout.ms = 40000
>>                 heartbeat.interval.ms = 3000
>>                 auto.commit.interval.ms = 5000
>>                 receive.buffer.bytes = 65536
>>                 ssl.truststore.type = JKS
>>                 ssl.truststore.location = null
>>                 ssl.keystore.password = null
>>                 fetch.min.bytes = 1
>>                 send.buffer.bytes = 131072
>>                 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>>                 group.id = pokertracker
>>                 retry.backoff.ms = 100
>>                 sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>                 sasl.kerberos.service.name = null
>>                 sasl.kerberos.ticket.renew.jitter = 0.05
>>                 ssl.trustmanager.algorithm = PKIX
>>                 ssl.key.password = null
>>                 fetch.max.wait.ms = 500
>>                 sasl.kerberos.min.time.before.relogin = 60000
>>                 connections.max.idle.ms = 540000
>>                 session.timeout.ms = 10000
>>                 metrics.num.samples = 2
>>                 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>>                 ssl.protocol = TLS
>>                 ssl.provider = null
>>                 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>                 ssl.keystore.location = null
>>                 ssl.cipher.suites = null
>>                 security.protocol = PLAINTEXT
>>                 ssl.keymanager.algorithm = SunX509
>>                 metrics.sample.window.ms = 30000
>>                 auto.offset.reset = latest
>>
>> 15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
>> 15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
>> 15:03:42,681 WARN  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
>>                 at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>>                 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>>                 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>>                 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>>                 at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>>                 at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>>                 at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)
>>                 at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
>>                 at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
>>                 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)
>>                 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
>>                 at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
>>                 at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)
>>                 at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)
>>                 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>                 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>                 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>                 at java.lang.reflect.Method.invoke(Method.java:498)
>>                 at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)
>>                 at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)
>>                 at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)
>>                 at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
>>                 at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
>>                 at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>>                 at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)
>>                 at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>>                 at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>>                 at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)
>>                 at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)
>>                 at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)
>>                 at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
>>                 at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>>                 at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>>                 at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>>                 at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
>>                 at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
>>                 at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
>>                 at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
>>                 at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)
>>                 at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
>>                 at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)
>>                 at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)
>>                 at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
>>                 at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
>>                 at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>>                 at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>>                 at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>>                 at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
>>                 at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
>>                 at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
>>                 at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
>>                 at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)
>>                 at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
>>                 at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)
>>                 at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)
>>                 at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)
>>                 at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)
>>                 at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)
>>                 at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
>>                 at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
>>                 at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>>                 at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
>>                 at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)
>>                 at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)
>>                 at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)
>>                 at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)
>>                 at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)
>>                 at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)
>>                 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>                 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>                 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>                 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>                 at java.lang.Thread.run(Thread.java:745)
>>                 at org.jboss.threads.JBossThread.run(JBossThread.java:320)
>>
>> 15:03:42,815 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
>> 15:03:43,221 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
>> 15:03:43,223 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker
>> 15:03:43,224 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
>> 15:03:43,246 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1
>> 15:03:43,247 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker
>>
>>
>> And here Is the log output from the docker container:
>>
>> 13:13:33,627 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values:
>>                 metric.reporters = []
>>                 metadata.max.age.ms = 300000
>>                 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
>>                 reconnect.backoff.ms = 50
>>                 sasl.kerberos.ticket.renew.window.factor = 0.8
>>                 max.partition.fetch.bytes = 1048576
>>                 bootstrap.servers = [172.17.0.1:9092]
>>                 ssl.keystore.type = JKS
>>                 enable.auto.commit = true
>>                 sasl.mechanism = GSSAPI
>>                 interceptor.classes = null
>>                 exclude.internal.topics = true
>>                 ssl.truststore.password = null
>>                 client.id = consumer-1
>>                 ssl.endpoint.identification.algorithm = null
>>                 max.poll.records = 2147483647
>>                 check.crcs = true
>>                 request.timeout.ms = 40000
>>                 heartbeat.interval.ms = 3000
>>                 auto.commit.interval.ms = 5000
>>                 receive.buffer.bytes = 65536
>>                 ssl.truststore.type = JKS
>>                 ssl.truststore.location = null
>>                 ssl.keystore.password = null
>>                 fetch.min.bytes = 1
>>                 send.buffer.bytes = 131072
>>                 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>>                 group.id = pokertracker
>>                 retry.backoff.ms = 100
>>                 sasl.kerberos.kinit.cmd = /usr/bin/kinit
>>                 sasl.kerberos.service.name = null
>>                 sasl.kerberos.ticket.renew.jitter = 0.05
>>                 ssl.trustmanager.algorithm = PKIX
>>                 ssl.key.password = null
>>                 fetch.max.wait.ms = 500
>>                 sasl.kerberos.min.time.before.relogin = 60000
>>                 connections.max.idle.ms = 540000
>>                 session.timeout.ms = 10000
>>                 metrics.num.samples = 2
>>                 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>>                 ssl.protocol = TLS
>>                 ssl.provider = null
>>                 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>>                 ssl.keystore.location = null
>>                 ssl.cipher.suites = null
>>                 security.protocol = PLAINTEXT
>>                 ssl.keymanager.algorithm = SunX509
>>                 metrics.sample.window.ms = 30000
>>                 auto.offset.reset = latest
>>
>> 13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
>> 13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
>> 13:13:33,711 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
>> 13:13:33,948 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
>> 13:13:34,009 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker
>>
>>
>> Here is the java code:
>>
>> @Startup
>> @ConcurrencyManagement(ConcurrencyManagementType.BEAN)
>> @Singleton
>> public class InMemoryCache {
>>
>>     @Inject
>>     KafkaConsumer<String, String> consumer;
>>
>>     @Dedicated
>>     @Inject
>>     ExecutorService kafka;
>>
>>     ...
>>
>>     @PostConstruct
>>     public void onInit() {
>> ...
>>         CompletableFuture
>>                 .runAsync(this::handleKafkaEvent, kafka);
>>     }
>>
>>     public void handleKafkaEvent() {
>>         while (true) {
>>             ConsumerRecords<String, String> records = consumer.poll(200);
>>             for (ConsumerRecord<String, String> record : records) {
>>                 switch (record.topic()) {
>>                     case KafkaProvider.TOPIC:
>>                         System.out.println("record.value() = " + record.value());
>>                         List<CoreEvent> events = converter.convertToEvents(record.value());
>>                         for (CoreEvent event : events) {
>>                             handle(event);
>>                         }
>>                         break;
>>                     default:
>>                         throw new IllegalArgumentException("Illegal message type: ");
>>                 }
>>             }
>>         }
>>     }
>>
>> ...
>>


Re: consumer.poll() hangs indefinitely in docker container

Posted by Oleg Zhurakousky <oz...@hortonworks.com>.
It hangs indefinitely in any container. It’s a known issue and has been brought up many times on this list, yet there is not fix for it.
The problem is with the fact that while poll() attempts to create an elusion that it is async and even allows you to set a timeout it is essentially very misleading if you look inside its implementation. The first call it makes is to fetch topic metadata. That call is not part of the Future it returns so if connection to broker is not available you’re dead since Kafka attempts to reconnect and there s no property to set reconnect attempts, so it attempts to reconnect indefinitely.

Cheers
Oleg

> On Aug 12, 2016, at 9:22 AM, Brem, Robert <Ro...@adesso.ch> wrote:
> 
> Hy I'm new to Kafka and messaging at all.
> 
> I have a simple java application that contains a consumer and a producer. It is working on the host system but if I try to run it in a docker container (Kafka is not in the container, it is still on the host) consumer.poll() hangs up and does not return.
> telnet tells me that inside the container the host:port 172.17.0.1:9092 is open.
> 
> In the docker container on startup Kafka tells me: Marking the coordinator ... dead for group ...
> 
> Can you give me a hint, in which direction I should look?
> Thanks!
> 
> That's the output on the host, with the working application:
> 
> 15:03:42,657 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 63) ConsumerConfig values:
>                metric.reporters = []
>                metadata.max.age.ms = 300000
>                partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
>                reconnect.backoff.ms = 50
>                sasl.kerberos.ticket.renew.window.factor = 0.8
>                max.partition.fetch.bytes = 1048576
>                bootstrap.servers = [172.17.0.1:9092]
>                ssl.keystore.type = JKS
>                enable.auto.commit = true
>                sasl.mechanism = GSSAPI
>                interceptor.classes = null
>                exclude.internal.topics = true
>                ssl.truststore.password = null
>                client.id = consumer-1
>                ssl.endpoint.identification.algorithm = null
>                max.poll.records = 2147483647
>                check.crcs = true
>                request.timeout.ms = 40000
>                heartbeat.interval.ms = 3000
>                auto.commit.interval.ms = 5000
>                receive.buffer.bytes = 65536
>                ssl.truststore.type = JKS
>                ssl.truststore.location = null
>                ssl.keystore.password = null
>                fetch.min.bytes = 1
>                send.buffer.bytes = 131072
>                value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>                group.id = pokertracker
>                retry.backoff.ms = 100
>                sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                sasl.kerberos.service.name = null
>                sasl.kerberos.ticket.renew.jitter = 0.05
>                ssl.trustmanager.algorithm = PKIX
>                ssl.key.password = null
>                fetch.max.wait.ms = 500
>                sasl.kerberos.min.time.before.relogin = 60000
>                connections.max.idle.ms = 540000
>                session.timeout.ms = 10000
>                metrics.num.samples = 2
>                key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>                ssl.protocol = TLS
>                ssl.provider = null
>                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                ssl.keystore.location = null
>                ssl.cipher.suites = null
>                security.protocol = PLAINTEXT
>                ssl.keymanager.algorithm = SunX509
>                metrics.sample.window.ms = 30000
>                auto.offset.reset = latest
> 
> 15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka version : 0.10.0.0
> 15:03:42,680 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Kafka commitId : b8642491e78c5a13
> 15:03:42,681 WARN  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 63) Error registering AppInfo mbean: javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=consumer-1
>                at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
>                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
>                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
>                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
>                at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
>                at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
>                at org.jboss.as.jmx.PluggableMBeanServerImpl$TcclMBeanServer.registerMBean(PluggableMBeanServerImpl.java:1527)
>                at org.jboss.as.jmx.PluggableMBeanServerImpl.registerMBean(PluggableMBeanServerImpl.java:871)
>                at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
>                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:694)
>                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:587)
>                at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:569)
>                at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.createConsumer(KafkaProvider.java:44)
>                at com.optimist.pokerstats.pokertracker.kafka.control.KafkaProvider.init(KafkaProvider.java:29)
>                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>                at java.lang.reflect.Method.invoke(Method.java:498)
>                at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.invokeMethods(DefaultLifecycleCallbackInvoker.java:98)
>                at org.jboss.weld.injection.producer.DefaultLifecycleCallbackInvoker.postConstruct(DefaultLifecycleCallbackInvoker.java:81)
>                at org.jboss.weld.injection.producer.BasicInjectionTarget.postConstruct(BasicInjectionTarget.java:126)
>                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:171)
>                at org.jboss.weld.context.AbstractContext.get(AbstractContext.java:96)
>                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>                at org.jboss.weld.bean.ContextualInstanceStrategy$ApplicationScopedContextualInstanceStrategy.get(ContextualInstanceStrategy.java:141)
>                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>                at org.jboss.weld.injection.producer.AbstractMemberProducer.getReceiver(AbstractMemberProducer.java:123)
>                at org.jboss.weld.injection.producer.AbstractMemberProducer.produce(AbstractMemberProducer.java:158)
>                at org.jboss.weld.bean.AbstractProducerBean.create(AbstractProducerBean.java:181)
>                at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
>                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>                at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
>                at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
>                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
>                at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
>                at org.jboss.weld.injection.producer.ResourceInjector$1.proceed(ResourceInjector.java:70)
>                at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
>                at org.jboss.weld.injection.producer.ResourceInjector.inject(ResourceInjector.java:72)
>                at org.jboss.weld.injection.producer.BasicInjectionTarget.inject(BasicInjectionTarget.java:121)
>                at org.jboss.weld.bean.ManagedBean.create(ManagedBean.java:159)
>                at org.jboss.weld.context.unbound.DependentContextImpl.get(DependentContextImpl.java:70)
>                at org.jboss.weld.bean.ContextualInstanceStrategy$DefaultContextualInstanceStrategy.get(ContextualInstanceStrategy.java:101)
>                at org.jboss.weld.bean.ContextualInstance.get(ContextualInstance.java:50)
>                at org.jboss.weld.manager.BeanManagerImpl.getReference(BeanManagerImpl.java:742)
>                at org.jboss.weld.manager.BeanManagerImpl.getInjectableReference(BeanManagerImpl.java:842)
>                at org.jboss.weld.injection.FieldInjectionPoint.inject(FieldInjectionPoint.java:92)
>                at org.jboss.weld.util.Beans.injectBoundFields(Beans.java:378)
>                at org.jboss.weld.util.Beans.injectFieldsAndInitializers(Beans.java:389)
>                at org.jboss.weld.injection.producer.DefaultInjector$1.proceed(DefaultInjector.java:71)
>                at org.jboss.weld.injection.InjectionContextImpl.run(InjectionContextImpl.java:48)
>                at org.jboss.weld.injection.producer.DefaultInjector.inject(DefaultInjector.java:73)
>                at org.jboss.weld.injection.producer.ejb.SessionBeanInjectionTarget.inject(SessionBeanInjectionTarget.java:140)
>                at org.jboss.as.weld.injection.WeldInjectionContext.inject(WeldInjectionContext.java:39)
>                at org.jboss.as.weld.injection.WeldInjectionInterceptor.processInvocation(WeldInjectionInterceptor.java:51)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.ee.component.AroundConstructInterceptorFactory$1.processInvocation(AroundConstructInterceptorFactory.java:28)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.weld.injection.WeldInterceptorInjectionInterceptor.processInvocation(WeldInterceptorInjectionInterceptor.java:56)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.weld.ejb.Jsr299BindingsCreateInterceptor.processInvocation(Jsr299BindingsCreateInterceptor.java:100)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.ee.component.NamespaceContextInterceptor.processInvocation(NamespaceContextInterceptor.java:50)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.ejb3.tx.CMTTxInterceptor.invokeInOurTx(CMTTxInterceptor.java:275)
>                at org.jboss.as.ejb3.tx.CMTTxInterceptor.requiresNew(CMTTxInterceptor.java:349)
>                at org.jboss.as.ejb3.tx.LifecycleCMTTxInterceptor.processInvocation(LifecycleCMTTxInterceptor.java:66)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.weld.injection.WeldInjectionContextInterceptor.processInvocation(WeldInjectionContextInterceptor.java:43)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.ejb3.component.interceptors.CurrentInvocationContextInterceptor.processInvocation(CurrentInvocationContextInterceptor.java:41)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.as.ee.concurrent.ConcurrentContextInterceptor.processInvocation(ConcurrentContextInterceptor.java:45)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.invocation.ContextClassLoaderInterceptor.processInvocation(ContextClassLoaderInterceptor.java:64)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.invocation.InterceptorContext.run(InterceptorContext.java:356)
>                at org.jboss.invocation.PrivilegedWithCombinerInterceptor.processInvocation(PrivilegedWithCombinerInterceptor.java:80)
>                at org.jboss.invocation.InterceptorContext.proceed(InterceptorContext.java:340)
>                at org.jboss.invocation.ChainedInterceptor.processInvocation(ChainedInterceptor.java:61)
>                at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:161)
>                at org.jboss.as.ee.component.BasicComponent.constructComponentInstance(BasicComponent.java:134)
>                at org.jboss.as.ee.component.BasicComponent.createInstance(BasicComponent.java:88)
>                at org.jboss.as.ejb3.component.singleton.SingletonComponent.getComponentInstance(SingletonComponent.java:124)
>                at org.jboss.as.ejb3.component.singleton.SingletonComponent.start(SingletonComponent.java:138)
>                at org.jboss.as.ee.component.ComponentStartService$1.run(ComponentStartService.java:54)
>                at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>                at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>                at java.lang.Thread.run(Thread.java:745)
>                at org.jboss.threads.JBossThread.run(JBossThread.java:320)
> 
> 15:03:42,815 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
> 15:03:43,221 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 15:03:43,223 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Revoking previously assigned partitions [] for group pokertracker
> 15:03:43,224 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) (Re-)joining group pokertracker
> 15:03:43,246 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Successfully joined group pokertracker with generation 1
> 15:03:43,247 INFO  [org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] (EE-ManagedThreadFactory-default-Thread-2) Setting newly assigned partitions [test-0] for group pokertracker
> 
> 
> And here Is the log output from the docker container:
> 
> 13:13:33,627 INFO  [org.apache.kafka.clients.consumer.ConsumerConfig] (ServerService Thread Pool -- 61) ConsumerConfig values:
>                metric.reporters = []
>                metadata.max.age.ms = 300000
>                partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
>                reconnect.backoff.ms = 50
>                sasl.kerberos.ticket.renew.window.factor = 0.8
>                max.partition.fetch.bytes = 1048576
>                bootstrap.servers = [172.17.0.1:9092]
>                ssl.keystore.type = JKS
>                enable.auto.commit = true
>                sasl.mechanism = GSSAPI
>                interceptor.classes = null
>                exclude.internal.topics = true
>                ssl.truststore.password = null
>                client.id = consumer-1
>                ssl.endpoint.identification.algorithm = null
>                max.poll.records = 2147483647
>                check.crcs = true
>                request.timeout.ms = 40000
>                heartbeat.interval.ms = 3000
>                auto.commit.interval.ms = 5000
>                receive.buffer.bytes = 65536
>                ssl.truststore.type = JKS
>                ssl.truststore.location = null
>                ssl.keystore.password = null
>                fetch.min.bytes = 1
>                send.buffer.bytes = 131072
>                value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>                group.id = pokertracker
>                retry.backoff.ms = 100
>                sasl.kerberos.kinit.cmd = /usr/bin/kinit
>                sasl.kerberos.service.name = null
>                sasl.kerberos.ticket.renew.jitter = 0.05
>                ssl.trustmanager.algorithm = PKIX
>                ssl.key.password = null
>                fetch.max.wait.ms = 500
>                sasl.kerberos.min.time.before.relogin = 60000
>                connections.max.idle.ms = 540000
>                session.timeout.ms = 10000
>                metrics.num.samples = 2
>                key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
>                ssl.protocol = TLS
>                ssl.provider = null
>                ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
>                ssl.keystore.location = null
>                ssl.cipher.suites = null
>                security.protocol = PLAINTEXT
>                ssl.keymanager.algorithm = SunX509
>                metrics.sample.window.ms = 30000
>                auto.offset.reset = latest
> 
> 13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka version : 0.10.0.0
> 13:13:33,650 INFO  [org.apache.kafka.common.utils.AppInfoParser] (ServerService Thread Pool -- 61) Kafka commitId : b8642491e78c5a13
> 13:13:33,711 WARN  [com.datastax.driver.core.Cluster] (cluster1-worker-1) Re-preparing already prepared query select DATA, VERSION from EVENTS where NAME = :name allow filtering. Please note that preparing the same query more than once is generally an anti-pattern and will likely affect performance. Consider preparing the statement only once.
> 13:13:33,948 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Discovered coordinator contabo:9092 (id: 2147483647 rack: null) for group pokertracker.
> 13:13:34,009 INFO  [org.apache.kafka.clients.consumer.internals.AbstractCoordinator] (EE-ManagedThreadFactory-default-Thread-1) Marking the coordinator contabo:9092 (id: 2147483647 rack: null) dead for group pokertracker
> 
> 
> Here is the java code:
> 
> @Startup
> @ConcurrencyManagement(ConcurrencyManagementType.BEAN)
> @Singleton
> public class InMemoryCache {
> 
>    @Inject
>    KafkaConsumer<String, String> consumer;
> 
>    @Dedicated
>    @Inject
>    ExecutorService kafka;
> 
>    ...
> 
>    @PostConstruct
>    public void onInit() {
> ...
>        CompletableFuture
>                .runAsync(this::handleKafkaEvent, kafka);
>    }
> 
>    public void handleKafkaEvent() {
>        while (true) {
>            ConsumerRecords<String, String> records = consumer.poll(200);
>            for (ConsumerRecord<String, String> record : records) {
>                switch (record.topic()) {
>                    case KafkaProvider.TOPIC:
>                        System.out.println("record.value() = " + record.value());
>                        List<CoreEvent> events = converter.convertToEvents(record.value());
>                        for (CoreEvent event : events) {
>                            handle(event);
>                        }
>                        break;
>                    default:
>                        throw new IllegalArgumentException("Illegal message type: ");
>                }
>            }
>        }
>    }
> 
> ...
>