You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dominik Safaric <do...@gmail.com> on 2016/11/03 07:05:47 UTC

Flink Kafka 0.10.0.0 connector

Dear all,

Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, I want to use the Flink 0.9 connector in conjunction with the 0.10.x versions. 

The reason behind this is because we are currently evaluating Flink part of an empirical research, hence a stable release is required. In addition, the reason why we have the requirement of using the Kafka 0.10.x versions is because since the 0.10.0 Kafka supports consumer and producer interceptors and message timestamps.

To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so far I’ve changed the Flink Kafka 0.9 connector dependency to the required Kafka version and build the project. However, as I imported the jar and added the source to the StreamExecutionEnvironment a type error occurred stating that the addSource function requires a class deriving from the SourceFunction interface. 

Hence, what have gone wrong during the build? I assume a dependency issue.

Next, I’ve tried just simply overriding the dependencies of the Flink Kafka connector within the project pom.xml, however there is obviously a slight API mismatch hence this cannot be done. 

I would really appreciate if anyone could provide some guidance once how to successfully build the Flink Kafka connector supporting Kafka 0.10.x versions. 

Thanks in advance,
Dominik 

Re: Flink Kafka 0.10.0.0 connector

Posted by Dominik Safaric <do...@gmail.com>.
Hi Robert,

Thanks for sharing this insight. 

However, the Flink Kafka 010 connector is only compatible with the 1.2-SNAPSHOT. 

Despite that, I’ve managed to get the Flink Kafka 09 use the Kafka version 0.10.0.1 Only minor changes to the test code had to be made, mostly in regard to Zookeeper utilities. 

Thanks for your help though!
Domini

> On 3 Nov 2016, at 13:59, Robert Metzger <rm...@apache.org> wrote:
> 
> Hi,
> I just tried the Kafka 0.10 connector again, and I could not reproduce the issue you are reporting.
> 
> This is my test job:
> 
> // parse input arguments
> final ParameterTool parameterTool = ParameterTool.fromArgs(args);
> 
> if(parameterTool.getNumberOfParameters() < 4) {
>    System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
>          "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <http://group.id/> <some id>");
>    return;
> }
> 
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().disableSysoutLogging();
> env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
> env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
> env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
> 
> DataStream<String> messageStream = env
>       .addSource(new FlinkKafkaConsumer010<>(
>             parameterTool.getRequired("topic"),
>             new SimpleStringSchema(),
>             parameterTool.getProperties()));
> 
> // write kafka stream to standard out.
> messageStream.print();
> 
> env.execute("Read from Kafka example");
> 
> On Thu, Nov 3, 2016 at 1:48 PM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> Hi Robert,
> 
>> I think the easiest way to get Kafka 0.10 running with Flink is to use the Kafka 0.10 connector in the current Flink master.
> 
> 
> Well, I’ve already builded the Kafka 0.10 connector from the master, but unfortunately I keep getting the error of the type checker that the type of the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment are not compatible - that is, addSource requires a subclass of the SourceFunction<T>, whereas the instance of the FlinkKafkaConsumer10 class is of type FlinkKafkaConsumer10<T>. 
> 
> Which I find quite strange because I would assume that the FlinkKafkaConsumer instance should be of type SourceFunction. However, the same even happened while building the FlinkKafkaConsumer09. 
> 
> Any hint what might be going on?
> 
> I’ve build the jar distribution as a clean maven package (without running the tests). 
> 
> Thanks,
> Dominik
> 
>> On 3 Nov 2016, at 13:29, Robert Metzger <rmetzger@apache.org <ma...@apache.org>> wrote:
>> 
>> Hi Dominik,
>> 
>> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not compile the Kafka 0.9 against Kafka 0.10 dependencies.
>> 
>> I think the easiest way to get Kafka 0.10 running with Flink is to use the Kafka 0.10 connector in the current Flink master.
>> You can probably copy the connector's code into your own project and use the new connector from there.
>> 
>> Regards,
>> Robert
>> 
>> 
>> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
>> Dear all,
>> 
>> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, I want to use the Flink 0.9 connector in conjunction with the 0.10.x versions. 
>> 
>> The reason behind this is because we are currently evaluating Flink part of an empirical research, hence a stable release is required. In addition, the reason why we have the requirement of using the Kafka 0.10.x versions is because since the 0.10.0 Kafka supports consumer and producer interceptors and message timestamps.
>> 
>> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so far I’ve changed the Flink Kafka 0.9 connector dependency to the required Kafka version and build the project. However, as I imported the jar and added the source to the StreamExecutionEnvironment a type error occurred stating that the addSource function requires a class deriving from the SourceFunction interface. 
>> 
>> Hence, what have gone wrong during the build? I assume a dependency issue.
>> 
>> Next, I’ve tried just simply overriding the dependencies of the Flink Kafka connector within the project pom.xml, however there is obviously a slight API mismatch hence this cannot be done. 
>> 
>> I would really appreciate if anyone could provide some guidance once how to successfully build the Flink Kafka connector supporting Kafka 0.10.x versions. 
>> 
>> Thanks in advance,
>> Dominik 
>> 
> 
> 


Re: Flink Kafka 0.10.0.0 connector

Posted by Robert Metzger <rm...@apache.org>.
Hi,
I just tried the Kafka 0.10 connector again, and I could not reproduce the
issue you are reporting.

This is my test job:

// parse input arguments
final ParameterTool parameterTool = ParameterTool.fromArgs(args);

if(parameterTool.getNumberOfParameters() < 4) {
   System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
         "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk
quorum> --group.id <some id>");
   return;
}

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 secodns
env.getConfig().setGlobalJobParameters(parameterTool); // make
parameters available in the web interface

DataStream<String> messageStream = env
      .addSource(new FlinkKafkaConsumer010<>(
            parameterTool.getRequired("topic"),
            new SimpleStringSchema(),
            parameterTool.getProperties()));

// write kafka stream to standard out.
messageStream.print();

env.execute("Read from Kafka example");


On Thu, Nov 3, 2016 at 1:48 PM, Dominik Safaric <do...@gmail.com>
wrote:

> Hi Robert,
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
>
>
> Well, I’ve already builded the Kafka 0.10 connector from the master, but
> unfortunately I keep getting the error of the type checker that the type of
> the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment
> are not compatible - that is, addSource requires a subclass of the
> SourceFunction<T>, whereas the instance of the FlinkKafkaConsumer10 class
> is of type FlinkKafkaConsumer10<T>.
>
> Which I find quite strange because I would assume that the
> FlinkKafkaConsumer instance should be of type SourceFunction. However, the
> same even happened while building the FlinkKafkaConsumer09.
>
> Any hint what might be going on?
>
> I’ve build the jar distribution as a clean maven package (without running
> the tests).
>
> Thanks,
> Dominik
>
> On 3 Nov 2016, at 13:29, Robert Metzger <rm...@apache.org> wrote:
>
> Hi Dominik,
>
> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not
> compile the Kafka 0.9 against Kafka 0.10 dependencies.
>
> I think the easiest way to get Kafka 0.10 running with Flink is to use the
> Kafka 0.10 connector in the current Flink master.
> You can probably copy the connector's code into your own project and use
> the new connector from there.
>
> Regards,
> Robert
>
>
> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric <do...@gmail.com>
> wrote:
>
>> Dear all,
>>
>> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x
>> connector, I want to use the Flink 0.9 connector in conjunction with the
>> 0.10.x versions.
>>
>> The reason behind this is because we are currently evaluating Flink part
>> of an empirical research, hence a stable release is required. In addition,
>> the reason why we have the requirement of using the Kafka 0.10.x versions
>> is because since the 0.10.0 Kafka supports consumer and producer
>> interceptors and message timestamps.
>>
>> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example,
>> so far I’ve changed the Flink Kafka 0.9 connector dependency to the
>> required Kafka version and build the project. However, as I imported the
>> jar and added the source to the StreamExecutionEnvironment a type error
>> occurred stating that the addSource function requires a class deriving from
>> the SourceFunction interface.
>>
>> *Hence, what have gone wrong during the build?* I assume a dependency
>> issue.
>>
>> Next, I’ve tried just simply overriding the dependencies of the Flink
>> Kafka connector within the project pom.xml, however there is obviously a
>> slight API mismatch hence this cannot be done.
>>
>> I would really appreciate if anyone could provide some guidance once how
>> to successfully build the Flink Kafka connector supporting Kafka 0.10.x
>> versions.
>>
>> Thanks in advance,
>> Dominik
>>
>
>
>

Re: Flink Kafka 0.10.0.0 connector

Posted by Dominik Safaric <do...@gmail.com>.
Hi Robert,

> I think the easiest way to get Kafka 0.10 running with Flink is to use the Kafka 0.10 connector in the current Flink master.


Well, I’ve already builded the Kafka 0.10 connector from the master, but unfortunately I keep getting the error of the type checker that the type of the FlinkKafkaConsumer10 and the one required by StreamExecutionEnvironment are not compatible - that is, addSource requires a subclass of the SourceFunction<T>, whereas the instance of the FlinkKafkaConsumer10 class is of type FlinkKafkaConsumer10<T>. 

Which I find quite strange because I would assume that the FlinkKafkaConsumer instance should be of type SourceFunction. However, the same even happened while building the FlinkKafkaConsumer09. 

Any hint what might be going on?

I’ve build the jar distribution as a clean maven package (without running the tests). 

Thanks,
Dominik

> On 3 Nov 2016, at 13:29, Robert Metzger <rm...@apache.org> wrote:
> 
> Hi Dominik,
> 
> Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not compile the Kafka 0.9 against Kafka 0.10 dependencies.
> 
> I think the easiest way to get Kafka 0.10 running with Flink is to use the Kafka 0.10 connector in the current Flink master.
> You can probably copy the connector's code into your own project and use the new connector from there.
> 
> Regards,
> Robert
> 
> 
> On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric <dominiksafaric@gmail.com <ma...@gmail.com>> wrote:
> Dear all,
> 
> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x connector, I want to use the Flink 0.9 connector in conjunction with the 0.10.x versions. 
> 
> The reason behind this is because we are currently evaluating Flink part of an empirical research, hence a stable release is required. In addition, the reason why we have the requirement of using the Kafka 0.10.x versions is because since the 0.10.0 Kafka supports consumer and producer interceptors and message timestamps.
> 
> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example, so far I’ve changed the Flink Kafka 0.9 connector dependency to the required Kafka version and build the project. However, as I imported the jar and added the source to the StreamExecutionEnvironment a type error occurred stating that the addSource function requires a class deriving from the SourceFunction interface. 
> 
> Hence, what have gone wrong during the build? I assume a dependency issue.
> 
> Next, I’ve tried just simply overriding the dependencies of the Flink Kafka connector within the project pom.xml, however there is obviously a slight API mismatch hence this cannot be done. 
> 
> I would really appreciate if anyone could provide some guidance once how to successfully build the Flink Kafka connector supporting Kafka 0.10.x versions. 
> 
> Thanks in advance,
> Dominik 
> 


Re: Flink Kafka 0.10.0.0 connector

Posted by Robert Metzger <rm...@apache.org>.
Hi Dominik,

Some of Kafka's APIs changed between Kafka 0.9 and 0.10, so you can not
compile the Kafka 0.9 against Kafka 0.10 dependencies.

I think the easiest way to get Kafka 0.10 running with Flink is to use the
Kafka 0.10 connector in the current Flink master.
You can probably copy the connector's code into your own project and use
the new connector from there.

Regards,
Robert


On Thu, Nov 3, 2016 at 8:05 AM, Dominik Safaric <do...@gmail.com>
wrote:

> Dear all,
>
> Although the Flink 1.2 version will rollout a Flink Kafka 0.10.x
> connector, I want to use the Flink 0.9 connector in conjunction with the
> 0.10.x versions.
>
> The reason behind this is because we are currently evaluating Flink part
> of an empirical research, hence a stable release is required. In addition,
> the reason why we have the requirement of using the Kafka 0.10.x versions
> is because since the 0.10.0 Kafka supports consumer and producer
> interceptors and message timestamps.
>
> To make the 0.9 connector support Kafka version e.g. 0.10.0 for example,
> so far I’ve changed the Flink Kafka 0.9 connector dependency to the
> required Kafka version and build the project. However, as I imported the
> jar and added the source to the StreamExecutionEnvironment a type error
> occurred stating that the addSource function requires a class deriving from
> the SourceFunction interface.
>
> *Hence, what have gone wrong during the build?* I assume a dependency
> issue.
>
> Next, I’ve tried just simply overriding the dependencies of the Flink
> Kafka connector within the project pom.xml, however there is obviously a
> slight API mismatch hence this cannot be done.
>
> I would really appreciate if anyone could provide some guidance once how
> to successfully build the Flink Kafka connector supporting Kafka 0.10.x
> versions.
>
> Thanks in advance,
> Dominik
>