You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by AndreaKinn <ki...@hotmail.it> on 2017/08/11 10:42:22 UTC

Error during Kafka connection

Hi,
In the last week I have correctly deployed a flink program which get data
from a kafka broker on my local machine.
Now I'm trying to produce the same thing but moving the kafka broker on a
cluster.

I didn't change any line of code, I report it here:

DataStream<Tuple2&lt;String,JSONLDObject>> stream = env
				.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(),
properties))
				.assignTimestampsAndWatermarks(new CustomTimestampExtractor())
				.keyBy(0);

While I have changed just the Kafka Ip.
Data model obviously is not changed.
Unfortunately now when I start Flink program I get this:

INFO  org.apache.kafka.common.utils.AppInfoParser                   - Kafka
version : 0.10.0.1
12:30:48,446 INFO  org.apache.kafka.common.utils.AppInfoParser                  
- Kafka commitId : a7a17cdec9eaa6c5
12:30:48,625 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
Discovered coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) for
group groupId.
12:30:48,626 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - *Marking
the coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) dead for
group groupId*

I bolded the line that worry me.

Then, no data are retrieved buy Kafka although flink continue to perform
checkpointing etc normally...

Any ideas?




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error during Kafka connection

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I don’t have experience running Kafka clusters behind proxies, but it seems like the configurations “advertised.host.name” and “advertised.port” for your Kafka brokers are what you’re looking for.
For information on that please refer to the Kafka documentations.

Cheers,
Gordon


On 12 August 2017 at 4:28:41 PM, AndreaKinn (kinn6aer@hotmail.it) wrote:

It is solvable? I'm not an expert of this stuff and the cluster is managed by  
the lab responsible. Maybe I can ask him to do something in order to solve.  



--  
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14852.html  
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.  

Re: Error during Kafka connection

Posted by AndreaKinn <ki...@hotmail.it>.
It is solvable? I'm not an expert of this stuff and the cluster is managed by
the lab responsible. Maybe I can ask him to do something in order to solve.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14852.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error during Kafka connection

Posted by Kien Truong <du...@gmail.com>.
Hi, 

You mentioned that your kafka broker is behind a proxy. This could be a problem, because when the client try to get the cluster's topology, it will get the brokers ' private addresses , which is not reachable. 


Regards, 
Kien 


On Aug 11, 2017, 18:18, at 18:18, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote:
>No, there should be no difference between setting it up on Ubuntu or OS
>X.
>
>I can’t really tell any anything suspicious from the information
>provided so far, unfortunately.
>Perhaps you can try first checking that the Kafka topic is consumable
>from where you’re running Flink, e.g. using the example console
>consumer / producers?
>
>
>On 11 August 2017 at 7:06:46 PM, AndreaKinn (kinn6aer@hotmail.it)
>wrote:
>
>the kafka version I use is the latest (0.11.0.0). But to be honestly,
>also  
>locally I use 0.11.0.0 and in that case it works correctly. Anyway the
>last  
>kafka connector on flink is designed for kafka 0.10.x.x  
>
>I use OS X locally and Ubuntu on the cluster. It has importance?  
>
>
>
>--  
>View this message in context:
>http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14824.html
> 
>Sent from the Apache Flink User Mailing List archive. mailing list
>archive at Nabble.com.  

Re: Error during Kafka connection

Posted by AndreaKinn <ki...@hotmail.it>.
I just tried to use telnet to public ip:port from outside and it works.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14829.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error during Kafka connection

Posted by AndreaKinn <ki...@hotmail.it>.
I tried running console consumer-producer from the localhost on the cluster:
this say me that the broker is currently active.

To reach the cluster from outside I use a redirect from a public (ip, port),
because the ip of the kafka broker is private... I suspect the problem can
be there.



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14828.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error during Kafka connection

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
No, there should be no difference between setting it up on Ubuntu or OS X.

I can’t really tell any anything suspicious from the information provided so far, unfortunately.
Perhaps you can try first checking that the Kafka topic is consumable from where you’re running Flink, e.g. using the example console consumer / producers?


On 11 August 2017 at 7:06:46 PM, AndreaKinn (kinn6aer@hotmail.it) wrote:

the kafka version I use is the latest (0.11.0.0). But to be honestly, also  
locally I use 0.11.0.0 and in that case it works correctly. Anyway the last  
kafka connector on flink is designed for kafka 0.10.x.x  

I use OS X locally and Ubuntu on the cluster. It has importance?  



--  
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14824.html  
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.  

Re: Error during Kafka connection

Posted by AndreaKinn <ki...@hotmail.it>.
the kafka version I use is the latest (0.11.0.0). But to be honestly, also
locally I use 0.11.0.0 and in that case it works correctly. Anyway the last
kafka connector on flink is designed for kafka 0.10.x.x

I use OS X locally and Ubuntu on the cluster. It has importance?



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822p14824.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Error during Kafka connection

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

AFAIK, Kafka group coordinators are supposed to always be marked dead, because we use static assignment internally and therefore Kafka's group coordination functionality is disabled.

Though it may be obvious, but to get that out of the way first: are you sure that the Kafka installation version matches (i.e. 0.10.0.1)?

Cheers,
Gordon

On 11 August 2017 at 6:43:51 PM, AndreaKinn (kinn6aer@hotmail.it) wrote:

Hi, 
In the last week I have correctly deployed a flink program which get data 
from a kafka broker on my local machine. 
Now I'm trying to produce the same thing but moving the kafka broker on a 
cluster. 

I didn't change any line of code, I report it here: 

DataStream<Tuple2&lt;String,JSONLDObject>> stream = env 
.addSource(new FlinkKafkaConsumer010<>(TOPIC, new CustomDeserializer(), 
properties)) 
.assignTimestampsAndWatermarks(new CustomTimestampExtractor()) 
.keyBy(0); 

While I have changed just the Kafka Ip. 
Data model obviously is not changed. 
Unfortunately now when I start Flink program I get this: 

INFO org.apache.kafka.common.utils.AppInfoParser - Kafka 
version : 0.10.0.1 
12:30:48,446 INFO org.apache.kafka.common.utils.AppInfoParser 
- Kafka commitId : a7a17cdec9eaa6c5 
12:30:48,625 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - 
Discovered coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) for 
group groupId. 
12:30:48,626 INFO 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator - *Marking 
the coordinator giordano-1-4-200:9092 (id: 2147483647 rack: null) dead for 
group groupId* 

I bolded the line that worry me. 

Then, no data are retrieved buy Kafka although flink continue to perform 
checkpointing etc normally... 

Any ideas? 




-- 
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Error-during-Kafka-connection-tp14822.html 
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.