You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Zhun Shen <sh...@gmail.com> on 2016/03/30 09:21:52 UTC

Kafka Test Error

Hi there,

flink version: 1.0.0
kafka version: 0.9.0.0
env: local

I run the script below:
./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id myGroup --partition.assignment.strategy round robin

But I got the error:
ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)


The code as  below:
        DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties()));
        messageStream.rebalance().map(new MapFunction<String, String>() {

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        }).print();


I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks.


Re: Kafka Test Error

Posted by Zhun Shen <sh...@gmail.com>.
I created a new project, and only add kaka-client, Flink-kafka-connect and Flink streaming libs, it works.

Thanks.


> On Apr 2, 2016, at 12:54 AM, Stephan Ewen <se...@apache.org> wrote:
> 
> The issue may be that you include Kafka twice:
> 
> 1) You explicitly add "org.apache.kafka:kafka-clients:0.9.0.0"
> 2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which internally adds "org.apache.kafka:kafka-clients:0.9.0.1"
> 
> These two Kafka versions may conflict. I would drop the dependency (1) and simply let the FlinkKafkaConsumer pull whatever dependency it needs by itself.
> The 0.9.0.1 client the Flink internally uses should read fine from Kafka 0.9.0.0 brokers.
> 
> Greetings,
> Stephan
> 
> 
> On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
> Yeah, I mean I read the demo with FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/ <http://data-artisans.com/kafka-flink-a-practical-how-to/>) then I wrote the program based on Kafka 0.9.0.0 and Flink 1.0.0.
> 
>> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com <ma...@olacabs.com>> wrote:
>> 
>> Did you make sure the flinkconnector version and flink version is the same ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>> 
>> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>> 
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id <http://group.id/>", "test");
>>         properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("partition.assignment.strategy", "range");
>> 
>>         DataStream<String> messageStream = env
>>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", new SimpleStringSchema(), properties));
>> 
>>         messageStream
>>                 .rebalance()
>>                 .map(new MapFunction<String, String>() {
>> 
>>                     @Override
>>                     public String map(String value) throws Exception {
>>                         return "Kafka and Flink says: " + value;
>>                     }
>>                 }).print();
>> 
>>         env.execute();
>>     }
>> 
>> 
>> Always got the error below:
>> 
>> java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>> 
>> 
>> 
>> 
>>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutosh16@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use following dependency. 
>>> 
>>> <dependency>
>>>             <groupId>org.apache.flink</groupId>
>>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>>              <version>1.0.0</version>
>>>             <scope>provided</scope>
>>>   </dependency>
>>> 
>>> Thanks
>>> Ashutosh
>>> 
>>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>>> Hi there,
>>> 
>>> I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that this lib is based on kaka-clients 0.9.0.1.
>>> 
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but I’m confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>> 
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>> 
>>> Any advice ? 
>>> 
>>> Thanks.
>>> 
>>> 
>>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
>>>> 
>>>> Hi!
>>>> 
>>>> A "NoSuchMethodError" usually means that you compile and run against different versions.
>>>> 
>>>> Make sure the version you reference in the IDE and the version on the cluster are the same.
>>>> 
>>>> Greetings,
>>>> Stephan
>>>> 
>>>> 
>>>> 
>>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com <ma...@olacabs.com>> wrote:
>>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk about kafka 0.9.0.1. 
>>>> 
>>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>>>> Hi there,
>>>> 
>>>> flink version: 1.0.0
>>>> kafka version: 0.9.0.0
>>>> env: local
>>>> 
>>>> I run the script below:
>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id <http://group.id/> myGroup --partition.assignment.strategy round robin
>>>> 
>>>> But I got the error:
>>>> ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>> 
>>>> 
>>>> The code as  below:
>>>>         DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties()));
>>>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>>>> 
>>>>             @Override
>>>>             public String map(String value) throws Exception {
>>>>                 return "Kafka and Flink says: " + value;
>>>>             }
>>>>         }).print();
>>>> 
>>>> 
>>>> I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks.
>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: Kafka Test Error

Posted by Stephan Ewen <se...@apache.org>.
The issue may be that you include Kafka twice:

1) You explicitly add "org.apache.kafka:kafka-clients:*0.9.0.0*"
2) You add "org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0", which
internally adds "org.apache.kafka:kafka-clients:*0.9.0.1*"

These two Kafka versions may conflict. I would drop the dependency (1) and
simply let the FlinkKafkaConsumer pull whatever dependency it needs by
itself.
The 0.9.0.1 client the Flink internally uses should read fine from Kafka
0.9.0.0 brokers.

Greetings,
Stephan


On Fri, Apr 1, 2016 at 5:19 PM, Zhun Shen <sh...@gmail.com> wrote:

> Yeah, I mean I read the demo with FlinkKafkaConsumer08(
> http://data-artisans.com/kafka-flink-a-practical-how-to/) then I wrote
> the program based on Kafka 0.9.0.0 and Flink 1.0.0.
>
> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
> Did you make sure the flinkconnector version and flink version is the same
> ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
>
> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <sh...@gmail.com> wrote:
>
>> I follow the example of kafka 0.8.0.0 on Flink doc.
>>
>>     public static void main(String[] args) throws Exception {
>>         StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         Properties properties = new Properties();
>>         properties.setProperty("bootstrap.servers", "localhost:9092");
>>         properties.setProperty("zookeeper.connect", "localhost:2181");
>>         properties.setProperty("group.id", "test");
>>         properties.setProperty("key.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("value.deserializer",
>> "org.apache.kafka.common.serialization.StringDeserializer");
>>         properties.setProperty("partition.assignment.strategy", "range");
>>
>>         DataStream<String> messageStream = env
>>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs",
>> new SimpleStringSchema(), properties));
>>
>>         messageStream
>>                 .rebalance()
>>                 .map(new MapFunction<String, String>() {
>>
>>                     @Override
>>                     public String map(String value) throws Exception {
>>                         return "Kafka and Flink says: " + value;
>>                     }
>>                 }).print();
>>
>>         env.execute();
>>     }
>>
>>
>> Always got the error below:
>>
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>
>>
>>
>>
>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <km...@gmail.com>
>> wrote:
>>
>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use
>> following dependency.
>>
>> <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>              <version>1.0.0</version>
>>             <scope>provided</scope>
>>   </dependency>
>>
>> Thanks
>> Ashutosh
>>
>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <sh...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> I check my build.gradle file, I use
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that
>>> this lib is based on kaka-clients 0.9.0.1.
>>>
>>> I want to use Flink streaming to consume Kafka’s events in realtime, but
>>> I’m confused by Flink’s libs with different versions. Which
>>> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>>
>>> part of my build.grade:
>>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>>> 'org.apache.flink:flink-java:1.0.0',
>>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>>
>>> Any advice ?
>>>
>>> Thanks.
>>>
>>>
>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org> wrote:
>>>
>>> Hi!
>>>
>>> A "NoSuchMethodError" usually means that you compile and run against
>>> different versions.
>>>
>>> Make sure the version you reference in the IDE and the version on the
>>> cluster are the same.
>>>
>>> Greetings,
>>> Stephan
>>>
>>>
>>>
>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <
>>> balaji.rajagopalan@olacabs.com> wrote:
>>>
>>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
>>>> talk about kafka 0.9.0.1.
>>>>
>>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <sh...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi there,
>>>>>
>>>>> flink version: 1.0.0
>>>>> kafka version: 0.9.0.0
>>>>> env: local
>>>>>
>>>>> I run the script below:
>>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic
>>>>> nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect
>>>>> localhost:2181 --group.id myGroup --partition.assignment.strategy
>>>>> round robin
>>>>>
>>>>> But I got the error:
>>>>> ava.lang.NoSuchMethodError:
>>>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>>         at
>>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>>>
>>>>>
>>>>> The code as  below:
>>>>>         DataStream<String> messageStream = env.addSource(new
>>>>> FlinkKafkaConsumer09<>("nginx-logs", new
>>>>> SimpleStringSchema(),parameterTool.getProperties()));
>>>>>         messageStream.rebalance().map(new MapFunction<String,
>>>>> String>() {
>>>>>
>>>>>             @Override
>>>>>             public String map(String value) throws Exception {
>>>>>                 return "Kafka and Flink says: " + value;
>>>>>             }
>>>>>         }).print();
>>>>>
>>>>>
>>>>> I check the error with google, but it shows that it is a method of
>>>>> kafka 0.9.01. Any idea? Thanks.
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>>
>
>

Re: Kafka Test Error

Posted by Zhun Shen <sh...@gmail.com>.
Yeah, I mean I read the demo with FlinkKafkaConsumer08(http://data-artisans.com/kafka-flink-a-practical-how-to/ <http://data-artisans.com/kafka-flink-a-practical-how-to/>) then I wrote the program based on Kafka 0.9.0.0 and Flink 1.0.0.

> On Apr 1, 2016, at 7:27 PM, Balaji Rajagopalan <ba...@olacabs.com> wrote:
> 
> Did you make sure the flinkconnector version and flink version is the same ? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08
> 
> On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
> I follow the example of kafka 0.8.0.0 on Flink doc.
> 
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id <http://group.id/>", "test");
>         properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.setProperty("partition.assignment.strategy", "range");
> 
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", new SimpleStringSchema(), properties));
> 
>         messageStream
>                 .rebalance()
>                 .map(new MapFunction<String, String>() {
> 
>                     @Override
>                     public String map(String value) throws Exception {
>                         return "Kafka and Flink says: " + value;
>                     }
>                 }).print();
> 
>         env.execute();
>     }
> 
> 
> Always got the error below:
> 
> java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
> 	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
> 
> 
> 
> 
>> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <kmr.ashutosh16@gmail.com <ma...@gmail.com>> wrote:
>> 
>> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use following dependency. 
>> 
>> <dependency>
>>             <groupId>org.apache.flink</groupId>
>>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>>              <version>1.0.0</version>
>>             <scope>provided</scope>
>>   </dependency>
>> 
>> Thanks
>> Ashutosh
>> 
>> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>> Hi there,
>> 
>> I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that this lib is based on kaka-clients 0.9.0.1.
>> 
>> I want to use Flink streaming to consume Kafka’s events in realtime, but I’m confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>> 
>> part of my build.grade:
>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>> 'org.apache.flink:flink-java:1.0.0',
>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>> 
>> Any advice ? 
>> 
>> Thanks.
>> 
>> 
>>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
>>> 
>>> Hi!
>>> 
>>> A "NoSuchMethodError" usually means that you compile and run against different versions.
>>> 
>>> Make sure the version you reference in the IDE and the version on the cluster are the same.
>>> 
>>> Greetings,
>>> Stephan
>>> 
>>> 
>>> 
>>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com <ma...@olacabs.com>> wrote:
>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk about kafka 0.9.0.1. 
>>> 
>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>>> Hi there,
>>> 
>>> flink version: 1.0.0
>>> kafka version: 0.9.0.0
>>> env: local
>>> 
>>> I run the script below:
>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id <http://group.id/> myGroup --partition.assignment.strategy round robin
>>> 
>>> But I got the error:
>>> ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>> 
>>> 
>>> The code as  below:
>>>         DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties()));
>>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>>> 
>>>             @Override
>>>             public String map(String value) throws Exception {
>>>                 return "Kafka and Flink says: " + value;
>>>             }
>>>         }).print();
>>> 
>>> 
>>> I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks.
>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Re: Kafka Test Error

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Did you make sure the flinkconnector version and flink version is the same
? Also for 0.8.0.0 you will have to use FlinkKafkaConsumer08

On Fri, Apr 1, 2016 at 3:21 PM, Zhun Shen <sh...@gmail.com> wrote:

> I follow the example of kafka 0.8.0.0 on Flink doc.
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         Properties properties = new Properties();
>         properties.setProperty("bootstrap.servers", "localhost:9092");
>         properties.setProperty("zookeeper.connect", "localhost:2181");
>         properties.setProperty("group.id", "test");
>         properties.setProperty("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.setProperty("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>         properties.setProperty("partition.assignment.strategy", "range");
>
>         DataStream<String> messageStream = env
>                 .addSource(new FlinkKafkaConsumer09<String>("nginx-logs",
> new SimpleStringSchema(), properties));
>
>         messageStream
>                 .rebalance()
>                 .map(new MapFunction<String, String>() {
>
>                     @Override
>                     public String map(String value) throws Exception {
>                         return "Kafka and Flink says: " + value;
>                     }
>                 }).print();
>
>         env.execute();
>     }
>
>
> Always got the error below:
>
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>
>
>
>
> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <km...@gmail.com>
> wrote:
>
> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use
> following dependency.
>
> <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>              <version>1.0.0</version>
>             <scope>provided</scope>
>   </dependency>
>
> Thanks
> Ashutosh
>
> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <sh...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> I check my build.gradle file, I use
>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that
>> this lib is based on kaka-clients 0.9.0.1.
>>
>> I want to use Flink streaming to consume Kafka’s events in realtime, but
>> I’m confused by Flink’s libs with different versions. Which
>> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
>> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>>
>> part of my build.grade:
>> 'org.apache.kafka:kafka_2.10:0.9.0.0',
>> 'org.apache.kafka:kafka-clients:0.9.0.0',
>> 'org.apache.flink:flink-java:1.0.0',
>> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
>> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
>> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>>
>> Any advice ?
>>
>> Thanks.
>>
>>
>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org> wrote:
>>
>> Hi!
>>
>> A "NoSuchMethodError" usually means that you compile and run against
>> different versions.
>>
>> Make sure the version you reference in the IDE and the version on the
>> cluster are the same.
>>
>> Greetings,
>> Stephan
>>
>>
>>
>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <
>> balaji.rajagopalan@olacabs.com> wrote:
>>
>>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
>>> talk about kafka 0.9.0.1.
>>>
>>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <sh...@gmail.com>
>>> wrote:
>>>
>>>> Hi there,
>>>>
>>>> flink version: 1.0.0
>>>> kafka version: 0.9.0.0
>>>> env: local
>>>>
>>>> I run the script below:
>>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs
>>>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --
>>>> group.id myGroup --partition.assignment.strategy round robin
>>>>
>>>> But I got the error:
>>>> ava.lang.NoSuchMethodError:
>>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>>         at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>>
>>>>
>>>> The code as  below:
>>>>         DataStream<String> messageStream = env.addSource(new
>>>> FlinkKafkaConsumer09<>("nginx-logs", new
>>>> SimpleStringSchema(),parameterTool.getProperties()));
>>>>         messageStream.rebalance().map(new MapFunction<String, String>()
>>>> {
>>>>
>>>>             @Override
>>>>             public String map(String value) throws Exception {
>>>>                 return "Kafka and Flink says: " + value;
>>>>             }
>>>>         }).print();
>>>>
>>>>
>>>> I check the error with google, but it shows that it is a method of
>>>> kafka 0.9.01. Any idea? Thanks.
>>>>
>>>>
>>>
>>
>>
>
>

Re: Kafka Test Error

Posted by Zhun Shen <sh...@gmail.com>.
I follow the example of kafka 0.8.0.0 on Flink doc.

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("partition.assignment.strategy", "range");

        DataStream<String> messageStream = env
                .addSource(new FlinkKafkaConsumer09<String>("nginx-logs", new SimpleStringSchema(), properties));

        messageStream
                .rebalance()
                .map(new MapFunction<String, String>() {

                    @Override
                    public String map(String value) throws Exception {
                        return "Kafka and Flink says: " + value;
                    }
                }).print();

        env.execute();
    }


Always got the error below:

java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)




> On Apr 1, 2016, at 1:40 PM, Ashutosh Kumar <km...@gmail.com> wrote:
> 
> I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use following dependency. 
> 
> <dependency>
>             <groupId>org.apache.flink</groupId>
>             <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
>              <version>1.0.0</version>
>             <scope>provided</scope>
>   </dependency>
> 
> Thanks
> Ashutosh
> 
> On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
> Hi there,
> 
> I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that this lib is based on kaka-clients 0.9.0.1.
> 
> I want to use Flink streaming to consume Kafka’s events in realtime, but I’m confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable with kafka 0.9.0.0 ?
> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
> 
> part of my build.grade:
> 'org.apache.kafka:kafka_2.10:0.9.0.0',
> 'org.apache.kafka:kafka-clients:0.9.0.0',
> 'org.apache.flink:flink-java:1.0.0',
> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
> 
> Any advice ? 
> 
> Thanks.
> 
> 
>> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <sewen@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi!
>> 
>> A "NoSuchMethodError" usually means that you compile and run against different versions.
>> 
>> Make sure the version you reference in the IDE and the version on the cluster are the same.
>> 
>> Greetings,
>> Stephan
>> 
>> 
>> 
>> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com <ma...@olacabs.com>> wrote:
>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk about kafka 0.9.0.1. 
>> 
>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
>> Hi there,
>> 
>> flink version: 1.0.0
>> kafka version: 0.9.0.0
>> env: local
>> 
>> I run the script below:
>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id <http://group.id/> myGroup --partition.assignment.strategy round robin
>> 
>> But I got the error:
>> ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>> 
>> 
>> The code as  below:
>>         DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties()));
>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>> 
>>             @Override
>>             public String map(String value) throws Exception {
>>                 return "Kafka and Flink says: " + value;
>>             }
>>         }).print();
>> 
>> 
>> I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks.
>> 
>> 
>> 
> 
> 


Re: Kafka Test Error

Posted by Ashutosh Kumar <km...@gmail.com>.
I am using flink 1.0.0 with kafka 0.9 . I works fine for me. I use
following dependency.

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
             <version>1.0.0</version>
            <scope>provided</scope>
  </dependency>

Thanks
Ashutosh

On Fri, Apr 1, 2016 at 10:46 AM, Zhun Shen <sh...@gmail.com> wrote:

> Hi there,
>
> I check my build.gradle file, I use
> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that
> this lib is based on kaka-clients 0.9.0.1.
>
> I want to use Flink streaming to consume Kafka’s events in realtime, but
> I’m confused by Flink’s libs with different versions. Which
> flink-connector-kafka is comparable with kafka 0.9.0.0 ?
> My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java
>
> part of my build.grade:
> 'org.apache.kafka:kafka_2.10:0.9.0.0',
> 'org.apache.kafka:kafka-clients:0.9.0.0',
> 'org.apache.flink:flink-java:1.0.0',
> 'org.apache.flink:flink-streaming-java_2.10:1.0.0',
> 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
> 'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0
>
> Any advice ?
>
> Thanks.
>
>
> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> A "NoSuchMethodError" usually means that you compile and run against
> different versions.
>
> Make sure the version you reference in the IDE and the version on the
> cluster are the same.
>
> Greetings,
> Stephan
>
>
>
> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
>> talk about kafka 0.9.0.1.
>>
>> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <sh...@gmail.com>
>> wrote:
>>
>>> Hi there,
>>>
>>> flink version: 1.0.0
>>> kafka version: 0.9.0.0
>>> env: local
>>>
>>> I run the script below:
>>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs
>>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --
>>> group.id myGroup --partition.assignment.strategy round robin
>>>
>>> But I got the error:
>>> ava.lang.NoSuchMethodError:
>>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>>         at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>>
>>>
>>> The code as  below:
>>>         DataStream<String> messageStream = env.addSource(new
>>> FlinkKafkaConsumer09<>("nginx-logs", new
>>> SimpleStringSchema(),parameterTool.getProperties()));
>>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>>>
>>>             @Override
>>>             public String map(String value) throws Exception {
>>>                 return "Kafka and Flink says: " + value;
>>>             }
>>>         }).print();
>>>
>>>
>>> I check the error with google, but it shows that it is a method of kafka
>>> 0.9.01. Any idea? Thanks.
>>>
>>>
>>
>
>

Re: Kafka Test Error

Posted by Zhun Shen <sh...@gmail.com>.
Hi there,

I check my build.gradle file, I use 'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0’, but I found that this lib is based on kaka-clients 0.9.0.1.

I want to use Flink streaming to consume Kafka’s events in realtime, but I’m confused by Flink’s libs with different versions. Which flink-connector-kafka is comparable with kafka 0.9.0.0 ?
My environment is Kafka: 0.9.0.0, Flink: 1.0.0, Language: java

part of my build.grade:
'org.apache.kafka:kafka_2.10:0.9.0.0',
'org.apache.kafka:kafka-clients:0.9.0.0',
'org.apache.flink:flink-java:1.0.0',
'org.apache.flink:flink-streaming-java_2.10:1.0.0',
'org.apache.flink:flink-connector-kafka-0.9_2.10:1.0.0',
'org.apache.flink:flink-connector-kafka-base_2.10:1.0.0

Any advice ? 

Thanks.


> On Mar 30, 2016, at 10:35 PM, Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> A "NoSuchMethodError" usually means that you compile and run against different versions.
> 
> Make sure the version you reference in the IDE and the version on the cluster are the same.
> 
> Greetings,
> Stephan
> 
> 
> 
> On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <balaji.rajagopalan@olacabs.com <ma...@olacabs.com>> wrote:
> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't talk about kafka 0.9.0.1. 
> 
> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <shenzhunallen@gmail.com <ma...@gmail.com>> wrote:
> Hi there,
> 
> flink version: 1.0.0
> kafka version: 0.9.0.0
> env: local
> 
> I run the script below:
> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --group.id <http://group.id/> myGroup --partition.assignment.strategy round robin
> 
> But I got the error:
> ava.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
> 
> 
> The code as  below:
>         DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>("nginx-logs", new SimpleStringSchema(),parameterTool.getProperties()));
>         messageStream.rebalance().map(new MapFunction<String, String>() {
> 
>             @Override
>             public String map(String value) throws Exception {
>                 return "Kafka and Flink says: " + value;
>             }
>         }).print();
> 
> 
> I check the error with google, but it shows that it is a method of kafka 0.9.01. Any idea? Thanks.
> 
> 
> 


Re: Kafka Test Error

Posted by Stephan Ewen <se...@apache.org>.
Hi!

A "NoSuchMethodError" usually means that you compile and run against
different versions.

Make sure the version you reference in the IDE and the version on the
cluster are the same.

Greetings,
Stephan



On Wed, Mar 30, 2016 at 9:42 AM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
> talk about kafka 0.9.0.1.
>
> On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <sh...@gmail.com>
> wrote:
>
>> Hi there,
>>
>> flink version: 1.0.0
>> kafka version: 0.9.0.0
>> env: local
>>
>> I run the script below:
>> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs
>> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --
>> group.id myGroup --partition.assignment.strategy round robin
>>
>> But I got the error:
>> ava.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>>         at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>>
>>
>> The code as  below:
>>         DataStream<String> messageStream = env.addSource(new
>> FlinkKafkaConsumer09<>("nginx-logs", new
>> SimpleStringSchema(),parameterTool.getProperties()));
>>         messageStream.rebalance().map(new MapFunction<String, String>() {
>>
>>             @Override
>>             public String map(String value) throws Exception {
>>                 return "Kafka and Flink says: " + value;
>>             }
>>         }).print();
>>
>>
>> I check the error with google, but it shows that it is a method of kafka
>> 0.9.01. Any idea? Thanks.
>>
>>
>

Re: Kafka Test Error

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
I have tested kafka 0.8.0.2 with flink 1.0.0 and it works for me. Can't
talk about kafka 0.9.0.1.

On Wed, Mar 30, 2016 at 12:51 PM, Zhun Shen <sh...@gmail.com> wrote:

> Hi there,
>
> flink version: 1.0.0
> kafka version: 0.9.0.0
> env: local
>
> I run the script below:
> ./bin/flink run -c com.test.flink.FlinkTest test.jar --topic nginx-logs
> --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 --
> group.id myGroup --partition.assignment.strategy round robin
>
> But I got the error:
> ava.lang.NoSuchMethodError:
> org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(Ljava/lang/String;)Ljava/util/List;
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:194)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:164)
>         at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.<init>(FlinkKafkaConsumer09.java:131)
>
>
> The code as  below:
>         DataStream<String> messageStream = env.addSource(new
> FlinkKafkaConsumer09<>("nginx-logs", new
> SimpleStringSchema(),parameterTool.getProperties()));
>         messageStream.rebalance().map(new MapFunction<String, String>() {
>
>             @Override
>             public String map(String value) throws Exception {
>                 return "Kafka and Flink says: " + value;
>             }
>         }).print();
>
>
> I check the error with google, but it shows that it is a method of kafka
> 0.9.01. Any idea? Thanks.
>
>