You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Lopez, Javier" <ja...@zalando.de> on 2016/02/17 11:10:47 UTC

Problem with Kafka 0.9 Client

Hi guys,

We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
been able to retrieve data from our Kafka Cluster. The DEBUG data reports
the following:

10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
     - Sending metadata request ClientRequest(expectResponse=true,
callback=null,
request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
createdTimeMs=1455702804364, sendTimeMs=0) to node 35
10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
      - Updated cluster metadata version 838 to Cluster(nodes = [Node(41,
ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
[87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
[35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
= 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
10:53:24,398 DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
group metadata request to broker 35
10:53:24,432 DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
metadata response ClientResponse(receivedTimeMs=1455702804432,
disconnected=false, request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
body={group_id=test}), createdTimeMs=1455702804398,
sendTimeMs=1455702804398),
responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})


We receive this message all the time. What we don't know understand is this
"responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}", as
we see an error_code we suppose there was a problem. Our Kafka cluster
works and we have some clients extracting data from it, so we don't know if
this could be a Kafka issue or a Flink issue.

Does anyone know, or understand, this response we are getting from Kafka?

Thanks.

Re: Problem with Kafka 0.9 Client

Posted by Robert Metzger <rm...@apache.org>.
Great. That's good news. Let us know if you encounter more issues with the
Kafka connector.

By the way, Kafka released 0.9.0.1, maybe updating your brokers to that
version resolves the issues? (Maybe the problems of some of the topics were
caused by bugs in Kafka)

On Tue, Feb 23, 2016 at 10:23 AM, Lopez, Javier <ja...@zalando.de>
wrote:

> Hi Robert,
>
> After we restarted our Kafka / Zookeeper cluster the consumer worked. Some
> of our topics had some problems. The flink's consumer for Kafka 0.9 works
> as expected.
>
> Thanks!
>
> On 19 February 2016 at 12:03, Lopez, Javier <ja...@zalando.de>
> wrote:
>
>> Hi, these are the properties:
>>
>> Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
>>         properties.setProperty("zookeeper.connect", ".37:2181");
>>         properties.setProperty("group.id", "test");
>>         properties.setProperty("client.id", "flink_test");
>>         properties.setProperty("auto.offset.reset", "earliest");
>>         properties.put("enable.auto.commit", "true");
>>         properties.put("auto.commit.interval.ms", "1000");
>>         properties.put("session.timeout.ms", "30000");
>>
>> We have tested with these as well:
>>
>> Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092");
>>         properties.setProperty("zookeeper.connect", ".37:2181");
>>         properties.setProperty("group.id", "test");
>>         properties.setProperty("client.id", "flink_test");
>>         properties.setProperty("auto.offset.reset", "earliest");
>>
>>
>> and these:
>>
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers",
>> ".87:9092,.41:9092,.35:9092");
>>         properties.setProperty("zookeeper.connect", ".37:2181");
>>         properties.setProperty("group.id", "test");
>>         properties.setProperty("client.id", "flink_test");
>>         properties.setProperty("auto.offset.reset", "earliest");
>>         properties.put("enable.auto.commit", "true");
>>         properties.put("auto.commit.interval.ms", "1000");
>>         properties.put("session.timeout.ms", "30000");
>>         properties.put("key.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.put("value.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>
>> With all three different configurations we get the same result.
>>
>> On 19 February 2016 at 11:55, Robert Metzger <rm...@apache.org> wrote:
>>
>>> Thank you. Can you send me also the list of properties you are passing
>>> to the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>>>
>>> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <javier.lopez@zalando.de
>>> > wrote:
>>>
>>>> Hi Robert,
>>>>
>>>> Please find attached the full logs of one of our latest executions. We
>>>> are basically trying to read from our kafka cluster and then writing the
>>>> data to elasticsearch.
>>>>
>>>> Thanks for your help!
>>>>
>>>> On 18 February 2016 at 11:19, Robert Metzger <rm...@apache.org>
>>>> wrote:
>>>>
>>>>> Hi Javier,
>>>>>
>>>>> sorry for the late response. In the Error Mapping of Kafka, it says
>>>>> that code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>>>
>>>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>>>
>>>>> How many brokers did you put into the list of bootstrap servers?
>>>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>>>> reading from Kafka?
>>>>>
>>>>>
>>>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <
>>>>> javier.lopez@zalando.de> wrote:
>>>>>
>>>>>> Hi guys,
>>>>>>
>>>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have
>>>>>> not been able to retrieve data from our Kafka Cluster. The DEBUG data
>>>>>> reports the following:
>>>>>>
>>>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>>>>            - Sending metadata request ClientRequest(expectResponse=true,
>>>>>> callback=null,
>>>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>>>>             - Updated cluster metadata version 838 to Cluster(nodes =
>>>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>>>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>>>>> 10:53:24,398 DEBUG
>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>>>>> group metadata request to broker 35
>>>>>> 10:53:24,432 DEBUG
>>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>>>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>>>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>>>>> body={group_id=test}), createdTimeMs=1455702804398,
>>>>>> sendTimeMs=1455702804398),
>>>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>>>>
>>>>>>
>>>>>> We receive this message all the time. What we don't know understand
>>>>>> is this
>>>>>> "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}", as
>>>>>> we see an error_code we suppose there was a problem. Our Kafka cluster
>>>>>> works and we have some clients extracting data from it, so we don't know if
>>>>>> this could be a Kafka issue or a Flink issue.
>>>>>>
>>>>>> Does anyone know, or understand, this response we are getting from
>>>>>> Kafka?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Re: Problem with Kafka 0.9 Client

Posted by "Lopez, Javier" <ja...@zalando.de>.
Hi Robert,

After we restarted our Kafka / Zookeeper cluster the consumer worked. Some
of our topics had some problems. The flink's consumer for Kafka 0.9 works
as expected.

Thanks!

On 19 February 2016 at 12:03, Lopez, Javier <ja...@zalando.de> wrote:

> Hi, these are the properties:
>
> Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>         properties.put("enable.auto.commit", "true");
>         properties.put("auto.commit.interval.ms", "1000");
>         properties.put("session.timeout.ms", "30000");
>
> We have tested with these as well:
>
> Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>
>
> and these:
>
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers",
> ".87:9092,.41:9092,.35:9092");
>         properties.setProperty("zookeeper.connect", ".37:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("client.id", "flink_test");
>         properties.setProperty("auto.offset.reset", "earliest");
>         properties.put("enable.auto.commit", "true");
>         properties.put("auto.commit.interval.ms", "1000");
>         properties.put("session.timeout.ms", "30000");
>         properties.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>
> With all three different configurations we get the same result.
>
> On 19 February 2016 at 11:55, Robert Metzger <rm...@apache.org> wrote:
>
>> Thank you. Can you send me also the list of properties you are passing to
>> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>>
>> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <ja...@zalando.de>
>> wrote:
>>
>>> Hi Robert,
>>>
>>> Please find attached the full logs of one of our latest executions. We
>>> are basically trying to read from our kafka cluster and then writing the
>>> data to elasticsearch.
>>>
>>> Thanks for your help!
>>>
>>> On 18 February 2016 at 11:19, Robert Metzger <rm...@apache.org>
>>> wrote:
>>>
>>>> Hi Javier,
>>>>
>>>> sorry for the late response. In the Error Mapping of Kafka, it says
>>>> that code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>>
>>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>>
>>>> How many brokers did you put into the list of bootstrap servers?
>>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>>> reading from Kafka?
>>>>
>>>>
>>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <
>>>> javier.lopez@zalando.de> wrote:
>>>>
>>>>> Hi guys,
>>>>>
>>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have
>>>>> not been able to retrieve data from our Kafka Cluster. The DEBUG data
>>>>> reports the following:
>>>>>
>>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>>>            - Sending metadata request ClientRequest(expectResponse=true,
>>>>> callback=null,
>>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>>>           - Updated cluster metadata version 838 to Cluster(nodes =
>>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>>>> 10:53:24,398 DEBUG
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>>>> group metadata request to broker 35
>>>>> 10:53:24,432 DEBUG
>>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>>>> body={group_id=test}), createdTimeMs=1455702804398,
>>>>> sendTimeMs=1455702804398),
>>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>>>
>>>>>
>>>>> We receive this message all the time. What we don't know understand is
>>>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>>>> works and we have some clients extracting data from it, so we don't know if
>>>>> this could be a Kafka issue or a Flink issue.
>>>>>
>>>>> Does anyone know, or understand, this response we are getting from
>>>>> Kafka?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>
>>
>

Re: Problem with Kafka 0.9 Client

Posted by "Lopez, Javier" <ja...@zalando.de>.
Hi, these are the properties:

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092"); //full IPs removed for security reasons
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");

We have tested with these as well:

Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");


and these:

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
".87:9092,.41:9092,.35:9092");
        properties.setProperty("zookeeper.connect", ".37:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("client.id", "flink_test");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("session.timeout.ms", "30000");
        properties.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");

With all three different configurations we get the same result.

On 19 February 2016 at 11:55, Robert Metzger <rm...@apache.org> wrote:

> Thank you. Can you send me also the list of properties you are passing to
> the kafka consumer? Are you only setting the "bootstrap.servers" or more?
>
> On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <ja...@zalando.de>
> wrote:
>
>> Hi Robert,
>>
>> Please find attached the full logs of one of our latest executions. We
>> are basically trying to read from our kafka cluster and then writing the
>> data to elasticsearch.
>>
>> Thanks for your help!
>>
>> On 18 February 2016 at 11:19, Robert Metzger <rm...@apache.org> wrote:
>>
>>> Hi Javier,
>>>
>>> sorry for the late response. In the Error Mapping of Kafka, it says that
>>> code 15 means: ConsumerCoordinatorNotAvailableCode.
>>>
>>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>>
>>> How many brokers did you put into the list of bootstrap servers?
>>> Can you maybe send me the full log of one of the Flink TaskManagers
>>> reading from Kafka?
>>>
>>>
>>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <javier.lopez@zalando.de
>>> > wrote:
>>>
>>>> Hi guys,
>>>>
>>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
>>>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
>>>> the following:
>>>>
>>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>>          - Sending metadata request ClientRequest(expectResponse=true,
>>>> callback=null,
>>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>>           - Updated cluster metadata version 838 to Cluster(nodes =
>>>> [Node(41, ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>>> 10:53:24,398 DEBUG
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>>> group metadata request to broker 35
>>>> 10:53:24,432 DEBUG
>>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>>> body={group_id=test}), createdTimeMs=1455702804398,
>>>> sendTimeMs=1455702804398),
>>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>>
>>>>
>>>> We receive this message all the time. What we don't know understand is
>>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>>> works and we have some clients extracting data from it, so we don't know if
>>>> this could be a Kafka issue or a Flink issue.
>>>>
>>>> Does anyone know, or understand, this response we are getting from
>>>> Kafka?
>>>>
>>>> Thanks.
>>>>
>>>
>>>
>>
>

Re: Problem with Kafka 0.9 Client

Posted by Robert Metzger <rm...@apache.org>.
Thank you. Can you send me also the list of properties you are passing to
the kafka consumer? Are you only setting the "bootstrap.servers" or more?

On Fri, Feb 19, 2016 at 11:46 AM, Lopez, Javier <ja...@zalando.de>
wrote:

> Hi Robert,
>
> Please find attached the full logs of one of our latest executions. We are
> basically trying to read from our kafka cluster and then writing the data
> to elasticsearch.
>
> Thanks for your help!
>
> On 18 February 2016 at 11:19, Robert Metzger <rm...@apache.org> wrote:
>
>> Hi Javier,
>>
>> sorry for the late response. In the Error Mapping of Kafka, it says that
>> code 15 means: ConsumerCoordinatorNotAvailableCode.
>>
>> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>>
>> How many brokers did you put into the list of bootstrap servers?
>> Can you maybe send me the full log of one of the Flink TaskManagers
>> reading from Kafka?
>>
>>
>> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <ja...@zalando.de>
>> wrote:
>>
>>> Hi guys,
>>>
>>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
>>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
>>> the following:
>>>
>>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>>          - Sending metadata request ClientRequest(expectResponse=true,
>>> callback=null,
>>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>>         - Updated cluster metadata version 838 to Cluster(nodes = [Node(41,
>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>>> 10:53:24,398 DEBUG
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>>> group metadata request to broker 35
>>> 10:53:24,432 DEBUG
>>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>>> disconnected=false, request=ClientRequest(expectResponse=true,
>>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>>> body={group_id=test}), createdTimeMs=1455702804398,
>>> sendTimeMs=1455702804398),
>>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>>
>>>
>>> We receive this message all the time. What we don't know understand is
>>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>>> works and we have some clients extracting data from it, so we don't know if
>>> this could be a Kafka issue or a Flink issue.
>>>
>>> Does anyone know, or understand, this response we are getting from Kafka?
>>>
>>> Thanks.
>>>
>>
>>
>

Re: Problem with Kafka 0.9 Client

Posted by "Lopez, Javier" <ja...@zalando.de>.
Hi Robert,

Please find attached the full logs of one of our latest executions. We are
basically trying to read from our kafka cluster and then writing the data
to elasticsearch.

Thanks for your help!

On 18 February 2016 at 11:19, Robert Metzger <rm...@apache.org> wrote:

> Hi Javier,
>
> sorry for the late response. In the Error Mapping of Kafka, it says that
> code 15 means: ConsumerCoordinatorNotAvailableCode.
>
> https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala
>
> How many brokers did you put into the list of bootstrap servers?
> Can you maybe send me the full log of one of the Flink TaskManagers
> reading from Kafka?
>
>
> On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <ja...@zalando.de>
> wrote:
>
>> Hi guys,
>>
>> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
>> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
>> the following:
>>
>> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>>        - Sending metadata request ClientRequest(expectResponse=true,
>> callback=null,
>> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
>> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
>> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
>> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>>         - Updated cluster metadata version 838 to Cluster(nodes = [Node(41,
>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
>> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
>> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
>> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
>> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
>> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
>> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
>> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
>> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
>> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
>> 10:53:24,398 DEBUG
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
>> group metadata request to broker 35
>> 10:53:24,432 DEBUG
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
>> metadata response ClientResponse(receivedTimeMs=1455702804432,
>> disconnected=false, request=ClientRequest(expectResponse=true,
>> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
>> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
>> body={group_id=test}), createdTimeMs=1455702804398,
>> sendTimeMs=1455702804398),
>> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>>
>>
>> We receive this message all the time. What we don't know understand is
>> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
>> as we see an error_code we suppose there was a problem. Our Kafka cluster
>> works and we have some clients extracting data from it, so we don't know if
>> this could be a Kafka issue or a Flink issue.
>>
>> Does anyone know, or understand, this response we are getting from Kafka?
>>
>> Thanks.
>>
>
>

Re: Problem with Kafka 0.9 Client

Posted by HungChang <un...@gmail.com>.
Had the same problem as Javier's. 

3450 [Thread-10] DEBUG
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
metadata response ClientResponse(receivedTimeMs=1455811593680,
disconnected=false, request=ClientRequest(expectResponse=true,
callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@278904b4,
request=RequestSend(header={api_key=10,api_version=0,correlation_id=10,client_id=consumer-4},
body={group_id=test_group}), createdTimeMs=1455811593645,
sendTimeMs=1455811593645),
responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})

When Flink consumes Kafka-0.9 locally and the other consumer reads Kafka
cluster both can work. 

After reading this -
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Detailed+Consumer+Coordinator+Design#KafkaDetailedConsumerCoordinatorDesign-Consumer

Found that this process is the issue that the consumer co-ordinator cannot
function -
The consumer sends a RegisterConsumer request to it's co-ordinator broker.
In the RegisterConsumerResponse, it receives the list of topic partitions
that it should own.

However we don't know the solution for this yet.

Best,

Sendoh



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Problem-with-Kafka-0-9-Client-tp4975p5029.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Problem with Kafka 0.9 Client

Posted by Robert Metzger <rm...@apache.org>.
Hi Javier,

sorry for the late response. In the Error Mapping of Kafka, it says that
code 15 means: ConsumerCoordinatorNotAvailableCode.
https://github.com/apache/kafka/blob/0.9.0/core/src/main/scala/kafka/common/ErrorMapping.scala

How many brokers did you put into the list of bootstrap servers?
Can you maybe send me the full log of one of the Flink TaskManagers reading
from Kafka?


On Wed, Feb 17, 2016 at 11:10 AM, Lopez, Javier <ja...@zalando.de>
wrote:

> Hi guys,
>
> We are using Flink 1.0-SNAPSHOT with Kafka 0.9 Consumer and we have not
> been able to retrieve data from our Kafka Cluster. The DEBUG data reports
> the following:
>
> 10:53:24,365 DEBUG org.apache.kafka.clients.NetworkClient
>        - Sending metadata request ClientRequest(expectResponse=true,
> callback=null,
> request=RequestSend(header={api_key=3,api_version=0,correlation_id=1673,client_id=flink_test},
> body={topics=[stream_test_3]}), isInitiatedByNetworkClient,
> createdTimeMs=1455702804364, sendTimeMs=0) to node 35
> 10:53:24,398 DEBUG org.apache.kafka.clients.Metadata
>       - Updated cluster metadata version 838 to Cluster(nodes = [Node(41,
> ip-XXXX.eu-west-1.compute.internal, 9092), Node(35,
> ip-XXXX.eu-west-1.compute.internal, 9092), Node(87,
> ip-XXXX.eu-west-1.compute.internal, 9092)], partitions = [Partition(topic =
> stream_test_3, partition = 0, leader = 87, replicas = [87,41,35,], isr =
> [87,41,35,], Partition(topic = stream_test_3, partition = 1, leader = 35,
> replicas = [35,41,87,], isr = [35,41,87,], Partition(topic = stream_test_3,
> partition = 4, leader = 87, replicas = [87,41,35,], isr = [87,41,35,],
> Partition(topic = stream_test_3, partition = 3, leader = 35, replicas =
> [35,87,41,], isr = [35,87,41,], Partition(topic = stream_test_3, partition
> = 2, leader = 41, replicas = [41,87,35,], isr = [41,87,35,]])
> 10:53:24,398 DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Issuing
> group metadata request to broker 35
> 10:53:24,432 DEBUG
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Group
> metadata response ClientResponse(receivedTimeMs=1455702804432,
> disconnected=false, request=ClientRequest(expectResponse=true,
> callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@63b68d94,
> request=RequestSend(header={api_key=10,api_version=0,correlation_id=1674,client_id=flink_test},
> body={group_id=test}), createdTimeMs=1455702804398,
> sendTimeMs=1455702804398),
> responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}})
>
>
> We receive this message all the time. What we don't know understand is
> this "responseBody={error_code=15,coordinator={node_id=-1,host=,port=-1}}",
> as we see an error_code we suppose there was a problem. Our Kafka cluster
> works and we have some clients extracting data from it, so we don't know if
> this could be a Kafka issue or a Flink issue.
>
> Does anyone know, or understand, this response we are getting from Kafka?
>
> Thanks.
>