You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@hop.apache.org by monajit choudhury <mo...@gmail.com> on 2022/04/22 04:50:11 UTC

Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Hi,

I am trying to test a simple kafka consumer using Apache Hop v1.2. When I
run the pipeline using the local runner, it works fine. But if I run it
using the flink runner I get the following error

You can only have one copy of the injector transform 'output' to accept the
Kafka messages

I have tried debugging the Hop code and looks like the root cause is the
initSubPipeline() method being invoked multiple times while using the Flink
runner. That's not the case when I use the local runner. Am I missing
something here?


Thanks

Monajit Choudhury

Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Bart Maertens <ba...@know.bi>.
Hi Monajit,

All of our architecture discussions and decisions happen on the dev mailing
list [1].
A lot of the more informal user and developer communication happens on our
mattermost channels [2]. You're very welcome to join us there!

[1] https://lists.apache.org/list.html?dev@hop.apache.org
[2] https://chat.project-hop.org



On Tue, Apr 26, 2022 at 7:35 PM monajit choudhury <mo...@gmail.com>
wrote:

> Hi hans,
>
> I was able to run it on Flink up updating the Kafka libs. There are other
> issues though , which I am working on fixing , the biggest issue being
> consistency . Sometimes it works and sometimes it doesn’t .
>
> On a side note , is there a way I can participate in this project ? I will
> be happy to help in anyway I can as I truly believe in the potential of
> what you guys have built .
>
> Thanks
> Mono
>
> On Mon, Apr 25, 2022 at 9:14 AM monajit choudhury <mo...@gmail.com>
> wrote:
>
>> Hi Hans,
>>
>> Thanks a lot for the guidance. I was able to run it on Flink but looks
>> like there's a issue with the Kafka Consumer
>>
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>>
>>
>> On analyzing the Fat jar I found that the version of the KafkaConsumer is
>> < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
>> jar should include.
>>
>>
>> Looks like Beam is using an older version of the kafka consumer
>>
>>
>>
>> Thanks
>>
>> Mono
>>
>>
>>
>>
>> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
>> hans.van.akelyen@gmail.com> wrote:
>>
>>> Hi Mono,
>>>
>>> I took a bit of time to set up a test environment on my local system
>>> because we can not always by heart if something actually works (we are
>>> working on more tests in combination with spark/flink/dataflow).
>>> But I can confirm it works with a Flink runner. I do agree that error
>>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>>> server is unavailable. The Flink job then never gets published to the
>>> cluster and you sit there wondering what's going on. When everything is
>>> configured correctly it works as expected.
>>>
>>> I created a sample pipeline using the Beam Kafka consumer and a write to
>>> text file to see if the data is being received in the correct format.
>>>
>>> Pipeline:
>>>
>>>  Screenshot 2022-04-23 at 14.55.06.png
>>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>>
>>> Flink console output:
>>>
>>>  Screenshot 2022-04-23 at 14.47.34.png
>>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>>
>>> Settings I used on the Beam run configuration:
>>>
>>>  Screenshot 2022-04-23 at 14.53.30.png
>>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>>
>>>
>>> Hope you get everything working.
>>> If there is anything more I can do please let me know.
>>>
>>> Kr,
>>> Hans
>>>
>>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Hans,
>>>>
>>>> Yeah I realized that apart from AVRO it supports string messages too.
>>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>>  The Java docs says that its only mean to be run with beam runners,
>>>> does it include the Flink runner ?
>>>>
>>>> Apart from that everything works like a charm and we even managed to
>>>> write some custom plugins for our usecases. If we can solve this kafka
>>>> consumer issue,  then we are all set for prime time.
>>>>
>>>> Really appreciate your responses so far.
>>>>
>>>> Thanks
>>>> Mono
>>>>
>>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com>
>>>> wrote:
>>>>
>>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>>
>>>>>
>>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <
>>>>> monojit.c@gmail.com>:
>>>>>
>>>>>> Hi Hans,
>>>>>>
>>>>>> Going through the log files I realized it had something to do with
>>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>>
>>>>>> Thanks
>>>>>> Mono
>>>>>>
>>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Monajit,
>>>>>>>
>>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>>
>>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>>>> copies".
>>>>>>> More information about this can be found on our documentation pages
>>>>>>> [1]
>>>>>>>
>>>>>>> Kind regards,
>>>>>>> Hans
>>>>>>>
>>>>>>> [1]
>>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>>
>>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>>>> it using the flink runner I get the following error
>>>>>>>>
>>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>>> accept the Kafka messages
>>>>>>>>
>>>>>>>> I have tried debugging the Hop code and looks like the root cause
>>>>>>>> is the initSubPipeline() method being invoked multiple times while using
>>>>>>>> the Flink runner. That's not the case when I use the local runner. Am I
>>>>>>>> missing something here?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Monajit Choudhury
>>>>>>>>
>>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>>
>>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Hi hans,

I was able to run it on Flink up updating the Kafka libs. There are other
issues though , which I am working on fixing , the biggest issue being
consistency . Sometimes it works and sometimes it doesn’t .

On a side note , is there a way I can participate in this project ? I will
be happy to help in anyway I can as I truly believe in the potential of
what you guys have built .

Thanks
Mono

On Mon, Apr 25, 2022 at 9:14 AM monajit choudhury <mo...@gmail.com>
wrote:

> Hi Hans,
>
> Thanks a lot for the guidance. I was able to run it on Flink but looks
> like there's a issue with the Kafka Consumer
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>
>
> On analyzing the Fat jar I found that the version of the KafkaConsumer is
> < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
> jar should include.
>
>
> Looks like Beam is using an older version of the kafka consumer
>
>
>
> Thanks
>
> Mono
>
>
>
>
> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Mono,
>>
>> I took a bit of time to set up a test environment on my local system
>> because we can not always by heart if something actually works (we are
>> working on more tests in combination with spark/flink/dataflow).
>> But I can confirm it works with a Flink runner. I do agree that error
>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>> server is unavailable. The Flink job then never gets published to the
>> cluster and you sit there wondering what's going on. When everything is
>> configured correctly it works as expected.
>>
>> I created a sample pipeline using the Beam Kafka consumer and a write to
>> text file to see if the data is being received in the correct format.
>>
>> Pipeline:
>>
>>  Screenshot 2022-04-23 at 14.55.06.png
>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>
>> Flink console output:
>>
>>  Screenshot 2022-04-23 at 14.47.34.png
>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>
>> Settings I used on the Beam run configuration:
>>
>>  Screenshot 2022-04-23 at 14.53.30.png
>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>
>>
>> Hope you get everything working.
>> If there is anything more I can do please let me know.
>>
>> Kr,
>> Hans
>>
>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi Hans,
>>>
>>> Yeah I realized that apart from AVRO it supports string messages too.
>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>  The Java docs says that its only mean to be run with beam runners, does
>>> it include the Flink runner ?
>>>
>>> Apart from that everything works like a charm and we even managed to
>>> write some custom plugins for our usecases. If we can solve this kafka
>>> consumer issue,  then we are all set for prime time.
>>>
>>> Really appreciate your responses so far.
>>>
>>> Thanks
>>> Mono
>>>
>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com> wrote:
>>>
>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>
>>>>
>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monojit.c@gmail.com
>>>> >:
>>>>
>>>>> Hi Hans,
>>>>>
>>>>> Going through the log files I realized it had something to do with
>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>
>>>>> Thanks
>>>>> Mono
>>>>>
>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>
>>>>>> Hi Monajit,
>>>>>>
>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>
>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>>> copies".
>>>>>> More information about this can be found on our documentation pages
>>>>>> [1]
>>>>>>
>>>>>> Kind regards,
>>>>>> Hans
>>>>>>
>>>>>> [1]
>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>
>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>>> it using the flink runner I get the following error
>>>>>>>
>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>> accept the Kafka messages
>>>>>>>
>>>>>>> I have tried debugging the Hop code and looks like the root cause is
>>>>>>> the initSubPipeline() method being invoked multiple times while using the
>>>>>>> Flink runner. That's not the case when I use the local runner. Am I missing
>>>>>>> something here?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Monajit Choudhury
>>>>>>>
>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>
>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Hans Van Akelyen <ha...@gmail.com>.
Hi Mono,

In the end when going to production you would not run from the GUI. It is
odd however that the metrics are not returned consistently when running
from the GUI.

The Beam upgrade has now been finished on our master branch, this included
quite some dependency cleanup and improvements to avoid duplicate
jars/classes.

Maybe you could give it a go with a fresh build from master and if the
issues are still there we can start digging and see what happens.

Kr,
Hans



On Fri, 13 May 2022 at 19:59, monajit choudhury <mo...@gmail.com> wrote:

> Sorry, didn't attach the consumer config. It looks like the consumer waits
> on something forever as the transform metrics doesnt show data.
>
> Thanks
> Mono
>
> On Fri, May 13, 2022 at 10:36 AM monajit choudhury <mo...@gmail.com>
> wrote:
>
>> Hi Hans,
>>
>> So the Beam kafka consumer just doesn't consume messages consitently when
>> I run it on Flink or Beam Runner using the GUI. It did a few times and then
>> just stopped working.  The Beam producer works fine. Not sure what am I
>> missing here
>>
>> Thanks
>> Mono
>>
>> On Mon, Apr 25, 2022 at 10:18 AM Hans Van Akelyen <
>> hans.van.akelyen@gmail.com> wrote:
>>
>>> Hi Mono,
>>>
>>> I tested against the kafka image from bitnami (
>>> https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their
>>> latest tag and it worked.
>>> That being said it seems Beam runst tests against kafka 1.0.0 up to
>>> 2.5.1 (
>>> https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle
>>> )
>>>
>>> It does seem we add the 1.0.0 lib to our release which is a really old
>>> version, I know Matt is currently working on a Beam upgrade for our next
>>> release so I'll add it to the list to get this sorted out.
>>>
>>> It will be safe to replace the kafka-clients jar located under
>>> plugins/engines/beam/lib with the same jar as from the kafka transform
>>> located under plugins/transforms/kafka, or even update both to 2.5.1.
>>>
>>> Kind regards,
>>> Hans
>>>
>>> On Mon, 25 Apr 2022 at 18:14, monajit choudhury <mo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Hans,
>>>>
>>>> Thanks a lot for the guidance. I was able to run it on Flink but looks
>>>> like there's a issue with the Kafka Consumer
>>>>
>>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>>> java.lang.NoSuchMethodError:
>>>> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>>>>
>>>>
>>>> On analyzing the Fat jar I found that the version of the KafkaConsumer
>>>> is < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
>>>> jar should include.
>>>>
>>>>
>>>> Looks like Beam is using an older version of the kafka consumer
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Mono
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
>>>> hans.van.akelyen@gmail.com> wrote:
>>>>
>>>>> Hi Mono,
>>>>>
>>>>> I took a bit of time to set up a test environment on my local system
>>>>> because we can not always by heart if something actually works (we are
>>>>> working on more tests in combination with spark/flink/dataflow).
>>>>> But I can confirm it works with a Flink runner. I do agree that error
>>>>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>>>>> server is unavailable. The Flink job then never gets published to the
>>>>> cluster and you sit there wondering what's going on. When everything is
>>>>> configured correctly it works as expected.
>>>>>
>>>>> I created a sample pipeline using the Beam Kafka consumer and a write
>>>>> to text file to see if the data is being received in the correct format.
>>>>>
>>>>> Pipeline:
>>>>>
>>>>>  Screenshot 2022-04-23 at 14.55.06.png
>>>>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>>>>
>>>>> Flink console output:
>>>>>
>>>>>  Screenshot 2022-04-23 at 14.47.34.png
>>>>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>>>>
>>>>> Settings I used on the Beam run configuration:
>>>>>
>>>>>  Screenshot 2022-04-23 at 14.53.30.png
>>>>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>>>>
>>>>>
>>>>> Hope you get everything working.
>>>>> If there is anything more I can do please let me know.
>>>>>
>>>>> Kr,
>>>>> Hans
>>>>>
>>>>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Hans,
>>>>>>
>>>>>> Yeah I realized that apart from AVRO it supports string messages too.
>>>>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>>>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>>>>  The Java docs says that its only mean to be run with beam runners,
>>>>>> does it include the Flink runner ?
>>>>>>
>>>>>> Apart from that everything works like a charm and we even managed to
>>>>>> write some custom plugins for our usecases. If we can solve this kafka
>>>>>> consumer issue,  then we are all set for prime time.
>>>>>>
>>>>>> Really appreciate your responses so far.
>>>>>>
>>>>>> Thanks
>>>>>> Mono
>>>>>>
>>>>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com>
>>>>>> wrote:
>>>>>>
>>>>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>>>>
>>>>>>>
>>>>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <
>>>>>>> monojit.c@gmail.com>:
>>>>>>>
>>>>>>>> Hi Hans,
>>>>>>>>
>>>>>>>> Going through the log files I realized it had something to do with
>>>>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>> Mono
>>>>>>>>
>>>>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Monajit,
>>>>>>>>>
>>>>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>>>>
>>>>>>>>> Another solution (but not yet tested here so not sure it will
>>>>>>>>> work) is to force it to a single thread by setting SINGLE_BEAM in the
>>>>>>>>> "number of copies".
>>>>>>>>> More information about this can be found on our documentation
>>>>>>>>> pages [1]
>>>>>>>>>
>>>>>>>>> Kind regards,
>>>>>>>>> Hans
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>>>>
>>>>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <
>>>>>>>>> monojit.c@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi,
>>>>>>>>>>
>>>>>>>>>> I am trying to test a simple kafka consumer using Apache Hop
>>>>>>>>>> v1.2. When I run the pipeline using the local runner, it works fine. But if
>>>>>>>>>> I run it using the flink runner I get the following error
>>>>>>>>>>
>>>>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>>>>> accept the Kafka messages
>>>>>>>>>>
>>>>>>>>>> I have tried debugging the Hop code and looks like the root cause
>>>>>>>>>> is the initSubPipeline() method being invoked multiple times while using
>>>>>>>>>> the Flink runner. That's not the case when I use the local runner. Am I
>>>>>>>>>> missing something here?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks
>>>>>>>>>>
>>>>>>>>>> Monajit Choudhury
>>>>>>>>>>
>>>>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>>>>
>>>>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Sorry, didn't attach the consumer config. It looks like the consumer waits
on something forever as the transform metrics doesnt show data.

Thanks
Mono

On Fri, May 13, 2022 at 10:36 AM monajit choudhury <mo...@gmail.com>
wrote:

> Hi Hans,
>
> So the Beam kafka consumer just doesn't consume messages consitently when
> I run it on Flink or Beam Runner using the GUI. It did a few times and then
> just stopped working.  The Beam producer works fine. Not sure what am I
> missing here
>
> Thanks
> Mono
>
> On Mon, Apr 25, 2022 at 10:18 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Mono,
>>
>> I tested against the kafka image from bitnami (
>> https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their
>> latest tag and it worked.
>> That being said it seems Beam runst tests against kafka 1.0.0 up to 2.5.1
>> (
>> https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle
>> )
>>
>> It does seem we add the 1.0.0 lib to our release which is a really old
>> version, I know Matt is currently working on a Beam upgrade for our next
>> release so I'll add it to the list to get this sorted out.
>>
>> It will be safe to replace the kafka-clients jar located under
>> plugins/engines/beam/lib with the same jar as from the kafka transform
>> located under plugins/transforms/kafka, or even update both to 2.5.1.
>>
>> Kind regards,
>> Hans
>>
>> On Mon, 25 Apr 2022 at 18:14, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi Hans,
>>>
>>> Thanks a lot for the guidance. I was able to run it on Flink but looks
>>> like there's a issue with the Kafka Consumer
>>>
>>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>>> java.lang.NoSuchMethodError:
>>> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>>>
>>>
>>> On analyzing the Fat jar I found that the version of the KafkaConsumer
>>> is < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
>>> jar should include.
>>>
>>>
>>> Looks like Beam is using an older version of the kafka consumer
>>>
>>>
>>>
>>> Thanks
>>>
>>> Mono
>>>
>>>
>>>
>>>
>>> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
>>> hans.van.akelyen@gmail.com> wrote:
>>>
>>>> Hi Mono,
>>>>
>>>> I took a bit of time to set up a test environment on my local system
>>>> because we can not always by heart if something actually works (we are
>>>> working on more tests in combination with spark/flink/dataflow).
>>>> But I can confirm it works with a Flink runner. I do agree that error
>>>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>>>> server is unavailable. The Flink job then never gets published to the
>>>> cluster and you sit there wondering what's going on. When everything is
>>>> configured correctly it works as expected.
>>>>
>>>> I created a sample pipeline using the Beam Kafka consumer and a write
>>>> to text file to see if the data is being received in the correct format.
>>>>
>>>> Pipeline:
>>>>
>>>>  Screenshot 2022-04-23 at 14.55.06.png
>>>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>>>
>>>> Flink console output:
>>>>
>>>>  Screenshot 2022-04-23 at 14.47.34.png
>>>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>>>
>>>> Settings I used on the Beam run configuration:
>>>>
>>>>  Screenshot 2022-04-23 at 14.53.30.png
>>>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>>>
>>>>
>>>> Hope you get everything working.
>>>> If there is anything more I can do please let me know.
>>>>
>>>> Kr,
>>>> Hans
>>>>
>>>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Hans,
>>>>>
>>>>> Yeah I realized that apart from AVRO it supports string messages too.
>>>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>>>  The Java docs says that its only mean to be run with beam runners,
>>>>> does it include the Flink runner ?
>>>>>
>>>>> Apart from that everything works like a charm and we even managed to
>>>>> write some custom plugins for our usecases. If we can solve this kafka
>>>>> consumer issue,  then we are all set for prime time.
>>>>>
>>>>> Really appreciate your responses so far.
>>>>>
>>>>> Thanks
>>>>> Mono
>>>>>
>>>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com>
>>>>> wrote:
>>>>>
>>>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>>>
>>>>>>
>>>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <
>>>>>> monojit.c@gmail.com>:
>>>>>>
>>>>>>> Hi Hans,
>>>>>>>
>>>>>>> Going through the log files I realized it had something to do with
>>>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>>>
>>>>>>> Thanks
>>>>>>> Mono
>>>>>>>
>>>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Monajit,
>>>>>>>>
>>>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>>>
>>>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>>>>> copies".
>>>>>>>> More information about this can be found on our documentation pages
>>>>>>>> [1]
>>>>>>>>
>>>>>>>> Kind regards,
>>>>>>>> Hans
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>>>
>>>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <
>>>>>>>> monojit.c@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>>>>> it using the flink runner I get the following error
>>>>>>>>>
>>>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>>>> accept the Kafka messages
>>>>>>>>>
>>>>>>>>> I have tried debugging the Hop code and looks like the root cause
>>>>>>>>> is the initSubPipeline() method being invoked multiple times while using
>>>>>>>>> the Flink runner. That's not the case when I use the local runner. Am I
>>>>>>>>> missing something here?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks
>>>>>>>>>
>>>>>>>>> Monajit Choudhury
>>>>>>>>>
>>>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>>>
>>>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Hi Hans,

So the Beam kafka consumer just doesn't consume messages consitently when I
run it on Flink or Beam Runner using the GUI. It did a few times and then
just stopped working.  The Beam producer works fine. Not sure what am I
missing here

Thanks
Mono

On Mon, Apr 25, 2022 at 10:18 AM Hans Van Akelyen <
hans.van.akelyen@gmail.com> wrote:

> Hi Mono,
>
> I tested against the kafka image from bitnami (
> https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their
> latest tag and it worked.
> That being said it seems Beam runst tests against kafka 1.0.0 up to 2.5.1 (
> https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle
> )
>
> It does seem we add the 1.0.0 lib to our release which is a really old
> version, I know Matt is currently working on a Beam upgrade for our next
> release so I'll add it to the list to get this sorted out.
>
> It will be safe to replace the kafka-clients jar located under
> plugins/engines/beam/lib with the same jar as from the kafka transform
> located under plugins/transforms/kafka, or even update both to 2.5.1.
>
> Kind regards,
> Hans
>
> On Mon, 25 Apr 2022 at 18:14, monajit choudhury <mo...@gmail.com>
> wrote:
>
>> Hi Hans,
>>
>> Thanks a lot for the guidance. I was able to run it on Flink but looks
>> like there's a issue with the Kafka Consumer
>>
>> Caused by: org.apache.beam.sdk.util.UserCodeException:
>> java.lang.NoSuchMethodError:
>> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>>
>>
>> On analyzing the Fat jar I found that the version of the KafkaConsumer is
>> < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
>> jar should include.
>>
>>
>> Looks like Beam is using an older version of the kafka consumer
>>
>>
>>
>> Thanks
>>
>> Mono
>>
>>
>>
>>
>> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
>> hans.van.akelyen@gmail.com> wrote:
>>
>>> Hi Mono,
>>>
>>> I took a bit of time to set up a test environment on my local system
>>> because we can not always by heart if something actually works (we are
>>> working on more tests in combination with spark/flink/dataflow).
>>> But I can confirm it works with a Flink runner. I do agree that error
>>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>>> server is unavailable. The Flink job then never gets published to the
>>> cluster and you sit there wondering what's going on. When everything is
>>> configured correctly it works as expected.
>>>
>>> I created a sample pipeline using the Beam Kafka consumer and a write to
>>> text file to see if the data is being received in the correct format.
>>>
>>> Pipeline:
>>>
>>>  Screenshot 2022-04-23 at 14.55.06.png
>>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>>
>>> Flink console output:
>>>
>>>  Screenshot 2022-04-23 at 14.47.34.png
>>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>>
>>> Settings I used on the Beam run configuration:
>>>
>>>  Screenshot 2022-04-23 at 14.53.30.png
>>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>>
>>>
>>> Hope you get everything working.
>>> If there is anything more I can do please let me know.
>>>
>>> Kr,
>>> Hans
>>>
>>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>>> wrote:
>>>
>>>> Hi Hans,
>>>>
>>>> Yeah I realized that apart from AVRO it supports string messages too.
>>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>>  The Java docs says that its only mean to be run with beam runners,
>>>> does it include the Flink runner ?
>>>>
>>>> Apart from that everything works like a charm and we even managed to
>>>> write some custom plugins for our usecases. If we can solve this kafka
>>>> consumer issue,  then we are all set for prime time.
>>>>
>>>> Really appreciate your responses so far.
>>>>
>>>> Thanks
>>>> Mono
>>>>
>>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com>
>>>> wrote:
>>>>
>>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>>
>>>>>
>>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <
>>>>> monojit.c@gmail.com>:
>>>>>
>>>>>> Hi Hans,
>>>>>>
>>>>>> Going through the log files I realized it had something to do with
>>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>>
>>>>>> Thanks
>>>>>> Mono
>>>>>>
>>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Monajit,
>>>>>>>
>>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>>
>>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>>>> copies".
>>>>>>> More information about this can be found on our documentation pages
>>>>>>> [1]
>>>>>>>
>>>>>>> Kind regards,
>>>>>>> Hans
>>>>>>>
>>>>>>> [1]
>>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>>
>>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>>>> it using the flink runner I get the following error
>>>>>>>>
>>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>>> accept the Kafka messages
>>>>>>>>
>>>>>>>> I have tried debugging the Hop code and looks like the root cause
>>>>>>>> is the initSubPipeline() method being invoked multiple times while using
>>>>>>>> the Flink runner. That's not the case when I use the local runner. Am I
>>>>>>>> missing something here?
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Monajit Choudhury
>>>>>>>>
>>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>>
>>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Hans Van Akelyen <ha...@gmail.com>.
Hi Mono,

I tested against the kafka image from bitnami (
https://hub.docker.com/r/bitnami/kafka/) which uses kafka 3.1 on their
latest tag and it worked.
That being said it seems Beam runst tests against kafka 1.0.0 up to 2.5.1 (
https://github.com/apache/beam/blob/880b10e71b963de6ec20efe614dd866e9a809da4/sdks/java/io/kafka/build.gradle
)

It does seem we add the 1.0.0 lib to our release which is a really old
version, I know Matt is currently working on a Beam upgrade for our next
release so I'll add it to the list to get this sorted out.

It will be safe to replace the kafka-clients jar located under
plugins/engines/beam/lib with the same jar as from the kafka transform
located under plugins/transforms/kafka, or even update both to 2.5.1.

Kind regards,
Hans

On Mon, 25 Apr 2022 at 18:14, monajit choudhury <mo...@gmail.com> wrote:

> Hi Hans,
>
> Thanks a lot for the guidance. I was able to run it on Flink but looks
> like there's a issue with the Kafka Consumer
>
> Caused by: org.apache.beam.sdk.util.UserCodeException:
> java.lang.NoSuchMethodError:
> org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord
>
>
> On analyzing the Fat jar I found that the version of the KafkaConsumer is
> < 2.x whereas the plugins folder has 2.4.1, which is the version the fat
> jar should include.
>
>
> Looks like Beam is using an older version of the kafka consumer
>
>
>
> Thanks
>
> Mono
>
>
>
>
> On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Mono,
>>
>> I took a bit of time to set up a test environment on my local system
>> because we can not always by heart if something actually works (we are
>> working on more tests in combination with spark/flink/dataflow).
>> But I can confirm it works with a Flink runner. I do agree that error
>> handling is not ideal, it gets stuck in a waiting loop when the kafka
>> server is unavailable. The Flink job then never gets published to the
>> cluster and you sit there wondering what's going on. When everything is
>> configured correctly it works as expected.
>>
>> I created a sample pipeline using the Beam Kafka consumer and a write to
>> text file to see if the data is being received in the correct format.
>>
>> Pipeline:
>>
>>  Screenshot 2022-04-23 at 14.55.06.png
>> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>>
>> Flink console output:
>>
>>  Screenshot 2022-04-23 at 14.47.34.png
>> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>>
>> Settings I used on the Beam run configuration:
>>
>>  Screenshot 2022-04-23 at 14.53.30.png
>> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>>
>>
>> Hope you get everything working.
>> If there is anything more I can do please let me know.
>>
>> Kr,
>> Hans
>>
>> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi Hans,
>>>
>>> Yeah I realized that apart from AVRO it supports string messages too.
>>> But the issue is the  beam consumer doesn't consume any messages from kafka
>>> . Even if put garbage in the topic name, it doesn't throw any errors.
>>>  The Java docs says that its only mean to be run with beam runners, does
>>> it include the Flink runner ?
>>>
>>> Apart from that everything works like a charm and we even managed to
>>> write some custom plugins for our usecases. If we can solve this kafka
>>> consumer issue,  then we are all set for prime time.
>>>
>>> Really appreciate your responses so far.
>>>
>>> Thanks
>>> Mono
>>>
>>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com> wrote:
>>>
>>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>>
>>>>
>>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monojit.c@gmail.com
>>>> >:
>>>>
>>>>> Hi Hans,
>>>>>
>>>>> Going through the log files I realized it had something to do with
>>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>>> issue is it only supports AVRO. I need to consume json messages
>>>>>
>>>>> Thanks
>>>>> Mono
>>>>>
>>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>>> hans.van.akelyen@gmail.com> wrote:
>>>>>
>>>>>> Hi Monajit,
>>>>>>
>>>>>> This is the auto scaling nature of Flink fighting against the
>>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>>> need to know when messages are finished. When running on Flink the best
>>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>>
>>>>>> Another solution (but not yet tested here so not sure it will work)
>>>>>> is to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>>> copies".
>>>>>> More information about this can be found on our documentation pages
>>>>>> [1]
>>>>>>
>>>>>> Kind regards,
>>>>>> Hans
>>>>>>
>>>>>> [1]
>>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>>
>>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>>> it using the flink runner I get the following error
>>>>>>>
>>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>>> accept the Kafka messages
>>>>>>>
>>>>>>> I have tried debugging the Hop code and looks like the root cause is
>>>>>>> the initSubPipeline() method being invoked multiple times while using the
>>>>>>> Flink runner. That's not the case when I use the local runner. Am I missing
>>>>>>> something here?
>>>>>>>
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>> Monajit Choudhury
>>>>>>>
>>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>>
>>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Hi Hans,

Thanks a lot for the guidance. I was able to run it on Flink but looks like
there's a issue with the Kafka Consumer

Caused by: org.apache.beam.sdk.util.UserCodeException:
java.lang.NoSuchMethodError:
org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecord


On analyzing the Fat jar I found that the version of the KafkaConsumer is <
2.x whereas the plugins folder has 2.4.1, which is the version the fat jar
should include.


Looks like Beam is using an older version of the kafka consumer



Thanks

Mono




On Sat, Apr 23, 2022 at 6:01 AM Hans Van Akelyen <ha...@gmail.com>
wrote:

> Hi Mono,
>
> I took a bit of time to set up a test environment on my local system
> because we can not always by heart if something actually works (we are
> working on more tests in combination with spark/flink/dataflow).
> But I can confirm it works with a Flink runner. I do agree that error
> handling is not ideal, it gets stuck in a waiting loop when the kafka
> server is unavailable. The Flink job then never gets published to the
> cluster and you sit there wondering what's going on. When everything is
> configured correctly it works as expected.
>
> I created a sample pipeline using the Beam Kafka consumer and a write to
> text file to see if the data is being received in the correct format.
>
> Pipeline:
>
>  Screenshot 2022-04-23 at 14.55.06.png
> <https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>
>
> Flink console output:
>
>  Screenshot 2022-04-23 at 14.47.34.png
> <https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>
>
> Settings I used on the Beam run configuration:
>
>  Screenshot 2022-04-23 at 14.53.30.png
> <https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>
>
>
> Hope you get everything working.
> If there is anything more I can do please let me know.
>
> Kr,
> Hans
>
> On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com>
> wrote:
>
>> Hi Hans,
>>
>> Yeah I realized that apart from AVRO it supports string messages too. But
>> the issue is the  beam consumer doesn't consume any messages from kafka .
>> Even if put garbage in the topic name, it doesn't throw any errors.
>>  The Java docs says that its only mean to be run with beam runners, does
>> it include the Flink runner ?
>>
>> Apart from that everything works like a charm and we even managed to
>> write some custom plugins for our usecases. If we can solve this kafka
>> consumer issue,  then we are all set for prime time.
>>
>> Really appreciate your responses so far.
>>
>> Thanks
>> Mono
>>
>> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com> wrote:
>>
>>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>>
>>>
>>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <monojit.c@gmail.com
>>> >:
>>>
>>>> Hi Hans,
>>>>
>>>> Going through the log files I realized it had something to do with
>>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>>> issue is it only supports AVRO. I need to consume json messages
>>>>
>>>> Thanks
>>>> Mono
>>>>
>>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>>> hans.van.akelyen@gmail.com> wrote:
>>>>
>>>>> Hi Monajit,
>>>>>
>>>>> This is the auto scaling nature of Flink fighting against the
>>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>>> need to know when messages are finished. When running on Flink the best
>>>>> solution would be to use the Beam Kafka Consumer.
>>>>>
>>>>> Another solution (but not yet tested here so not sure it will work) is
>>>>> to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>>> copies".
>>>>> More information about this can be found on our documentation pages [1]
>>>>>
>>>>> Kind regards,
>>>>> Hans
>>>>>
>>>>> [1]
>>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>>
>>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>>> it using the flink runner I get the following error
>>>>>>
>>>>>> You can only have one copy of the injector transform 'output' to
>>>>>> accept the Kafka messages
>>>>>>
>>>>>> I have tried debugging the Hop code and looks like the root cause is
>>>>>> the initSubPipeline() method being invoked multiple times while using the
>>>>>> Flink runner. That's not the case when I use the local runner. Am I missing
>>>>>> something here?
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Monajit Choudhury
>>>>>>
>>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>>
>>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Hans Van Akelyen <ha...@gmail.com>.
Hi Mono,

I took a bit of time to set up a test environment on my local system
because we can not always by heart if something actually works (we are
working on more tests in combination with spark/flink/dataflow).
But I can confirm it works with a Flink runner. I do agree that error
handling is not ideal, it gets stuck in a waiting loop when the kafka
server is unavailable. The Flink job then never gets published to the
cluster and you sit there wondering what's going on. When everything is
configured correctly it works as expected.

I created a sample pipeline using the Beam Kafka consumer and a write to
text file to see if the data is being received in the correct format.

Pipeline:

 Screenshot 2022-04-23 at 14.55.06.png
<https://drive.google.com/file/d/1NAFlplLxSaFbgsXpjCFw2MeMOjUuxDkM/view?usp=drive_web>

Flink console output:

 Screenshot 2022-04-23 at 14.47.34.png
<https://drive.google.com/file/d/1Hk6Mp1feFw5iaUbv-ih3F1GFrlySPKda/view?usp=drive_web>

Settings I used on the Beam run configuration:

 Screenshot 2022-04-23 at 14.53.30.png
<https://drive.google.com/file/d/1lkMKz1mV5ovGUrV0xAtSHKxIp7Abad8w/view?usp=drive_web>


Hope you get everything working.
If there is anything more I can do please let me know.

Kr,
Hans

On Sat, 23 Apr 2022 at 05:02, monajit choudhury <mo...@gmail.com> wrote:

> Hi Hans,
>
> Yeah I realized that apart from AVRO it supports string messages too. But
> the issue is the  beam consumer doesn't consume any messages from kafka .
> Even if put garbage in the topic name, it doesn't throw any errors.
>  The Java docs says that its only mean to be run with beam runners, does
> it include the Flink runner ?
>
> Apart from that everything works like a charm and we even managed to write
> some custom plugins for our usecases. If we can solve this kafka consumer
> issue,  then we are all set for prime time.
>
> Really appreciate your responses so far.
>
> Thanks
> Mono
>
> On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com> wrote:
>
>> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>>
>>
>> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <mo...@gmail.com>:
>>
>>> Hi Hans,
>>>
>>> Going through the log files I realized it had something to do with
>>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>>> issue is it only supports AVRO. I need to consume json messages
>>>
>>> Thanks
>>> Mono
>>>
>>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>>> hans.van.akelyen@gmail.com> wrote:
>>>
>>>> Hi Monajit,
>>>>
>>>> This is the auto scaling nature of Flink fighting against the
>>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>>> need to know when messages are finished. When running on Flink the best
>>>> solution would be to use the Beam Kafka Consumer.
>>>>
>>>> Another solution (but not yet tested here so not sure it will work) is
>>>> to force it to a single thread by setting SINGLE_BEAM in the "number of
>>>> copies".
>>>> More information about this can be found on our documentation pages [1]
>>>>
>>>> Kind regards,
>>>> Hans
>>>>
>>>> [1]
>>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>>
>>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2.
>>>>> When I run the pipeline using the local runner, it works fine. But if I run
>>>>> it using the flink runner I get the following error
>>>>>
>>>>> You can only have one copy of the injector transform 'output' to
>>>>> accept the Kafka messages
>>>>>
>>>>> I have tried debugging the Hop code and looks like the root cause is
>>>>> the initSubPipeline() method being invoked multiple times while using the
>>>>> Flink runner. That's not the case when I use the local runner. Am I missing
>>>>> something here?
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> Monajit Choudhury
>>>>>
>>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>>
>>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Hi Hans,

Yeah I realized that apart from AVRO it supports string messages too. But
the issue is the  beam consumer doesn't consume any messages from kafka .
Even if put garbage in the topic name, it doesn't throw any errors.
 The Java docs says that its only mean to be run with beam runners, does it
include the Flink runner ?

Apart from that everything works like a charm and we even managed to write
some custom plugins for our usecases. If we can solve this kafka consumer
issue,  then we are all set for prime time.

Really appreciate your responses so far.

Thanks
Mono

On Fri, Apr 22, 2022, 15:49 Matt Casters <ma...@neo4j.com> wrote:

> The Beam Kafka Consumer obviously accepts JSON messages as strings.
>
>
> Op vr 22 apr. 2022 17:57 schreef monajit choudhury <mo...@gmail.com>:
>
>> Hi Hans,
>>
>> Going through the log files I realized it had something to do with
>> multithreaded executions. I tried using the  Beam Kafka Consumer but the
>> issue is it only supports AVRO. I need to consume json messages
>>
>> Thanks
>> Mono
>>
>> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
>> hans.van.akelyen@gmail.com> wrote:
>>
>>> Hi Monajit,
>>>
>>> This is the auto scaling nature of Flink fighting against the
>>> requirement of having a single threaded pipeline for Kafka messages (as we
>>> need to know when messages are finished. When running on Flink the best
>>> solution would be to use the Beam Kafka Consumer.
>>>
>>> Another solution (but not yet tested here so not sure it will work) is
>>> to force it to a single thread by setting SINGLE_BEAM in the "number of
>>> copies".
>>> More information about this can be found on our documentation pages [1]
>>>
>>> Kind regards,
>>> Hans
>>>
>>> [1]
>>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>>
>>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. When
>>>> I run the pipeline using the local runner, it works fine. But if I run it
>>>> using the flink runner I get the following error
>>>>
>>>> You can only have one copy of the injector transform 'output' to accept
>>>> the Kafka messages
>>>>
>>>> I have tried debugging the Hop code and looks like the root cause is
>>>> the initSubPipeline() method being invoked multiple times while using the
>>>> Flink runner. That's not the case when I use the local runner. Am I missing
>>>> something here?
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Monajit Choudhury
>>>>
>>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>>
>>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Matt Casters <ma...@neo4j.com>.
The Beam Kafka Consumer obviously accepts JSON messages as strings.


Op vr 22 apr. 2022 17:57 schreef monajit choudhury <mo...@gmail.com>:

> Hi Hans,
>
> Going through the log files I realized it had something to do with
> multithreaded executions. I tried using the  Beam Kafka Consumer but the
> issue is it only supports AVRO. I need to consume json messages
>
> Thanks
> Mono
>
> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Monajit,
>>
>> This is the auto scaling nature of Flink fighting against the requirement
>> of having a single threaded pipeline for Kafka messages (as we need to know
>> when messages are finished. When running on Flink the best solution would
>> be to use the Beam Kafka Consumer.
>>
>> Another solution (but not yet tested here so not sure it will work) is to
>> force it to a single thread by setting SINGLE_BEAM in the "number of
>> copies".
>> More information about this can be found on our documentation pages [1]
>>
>> Kind regards,
>> Hans
>>
>> [1]
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>
>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. When
>>> I run the pipeline using the local runner, it works fine. But if I run it
>>> using the flink runner I get the following error
>>>
>>> You can only have one copy of the injector transform 'output' to accept
>>> the Kafka messages
>>>
>>> I have tried debugging the Hop code and looks like the root cause is the
>>> initSubPipeline() method being invoked multiple times while using the Flink
>>> runner. That's not the case when I use the local runner. Am I missing
>>> something here?
>>>
>>>
>>> Thanks
>>>
>>> Monajit Choudhury
>>>
>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>
>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
I just tried the SINGLE_BEAM option but it didn't work. Will try the Beam
consumer now.

Thanks
Mono

On Fri, Apr 22, 2022 at 8:57 AM monajit choudhury <mo...@gmail.com>
wrote:

> Hi Hans,
>
> Going through the log files I realized it had something to do with
> multithreaded executions. I tried using the  Beam Kafka Consumer but the
> issue is it only supports AVRO. I need to consume json messages
>
> Thanks
> Mono
>
> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Monajit,
>>
>> This is the auto scaling nature of Flink fighting against the requirement
>> of having a single threaded pipeline for Kafka messages (as we need to know
>> when messages are finished. When running on Flink the best solution would
>> be to use the Beam Kafka Consumer.
>>
>> Another solution (but not yet tested here so not sure it will work) is to
>> force it to a single thread by setting SINGLE_BEAM in the "number of
>> copies".
>> More information about this can be found on our documentation pages [1]
>>
>> Kind regards,
>> Hans
>>
>> [1]
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>
>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. When
>>> I run the pipeline using the local runner, it works fine. But if I run it
>>> using the flink runner I get the following error
>>>
>>> You can only have one copy of the injector transform 'output' to accept
>>> the Kafka messages
>>>
>>> I have tried debugging the Hop code and looks like the root cause is the
>>> initSubPipeline() method being invoked multiple times while using the Flink
>>> runner. That's not the case when I use the local runner. Am I missing
>>> something here?
>>>
>>>
>>> Thanks
>>>
>>> Monajit Choudhury
>>>
>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>
>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Matt Casters <ma...@neo4j.com>.
The Beam Kafka Consumer obviously accepts JSON messages as strings.


Op vr 22 apr. 2022 17:57 schreef monajit choudhury <mo...@gmail.com>:

> Hi Hans,
>
> Going through the log files I realized it had something to do with
> multithreaded executions. I tried using the  Beam Kafka Consumer but the
> issue is it only supports AVRO. I need to consume json messages
>
> Thanks
> Mono
>
> On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
> hans.van.akelyen@gmail.com> wrote:
>
>> Hi Monajit,
>>
>> This is the auto scaling nature of Flink fighting against the requirement
>> of having a single threaded pipeline for Kafka messages (as we need to know
>> when messages are finished. When running on Flink the best solution would
>> be to use the Beam Kafka Consumer.
>>
>> Another solution (but not yet tested here so not sure it will work) is to
>> force it to a single thread by setting SINGLE_BEAM in the "number of
>> copies".
>> More information about this can be found on our documentation pages [1]
>>
>> Kind regards,
>> Hans
>>
>> [1]
>> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>>
>> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to test a simple kafka consumer using Apache Hop v1.2. When
>>> I run the pipeline using the local runner, it works fine. But if I run it
>>> using the flink runner I get the following error
>>>
>>> You can only have one copy of the injector transform 'output' to accept
>>> the Kafka messages
>>>
>>> I have tried debugging the Hop code and looks like the root cause is the
>>> initSubPipeline() method being invoked multiple times while using the Flink
>>> runner. That's not the case when I use the local runner. Am I missing
>>> something here?
>>>
>>>
>>> Thanks
>>>
>>> Monajit Choudhury
>>>
>>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>>
>>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by monajit choudhury <mo...@gmail.com>.
Hi Hans,

Going through the log files I realized it had something to do with
multithreaded executions. I tried using the  Beam Kafka Consumer but the
issue is it only supports AVRO. I need to consume json messages

Thanks
Mono

On Fri, Apr 22, 2022 at 12:21 AM Hans Van Akelyen <
hans.van.akelyen@gmail.com> wrote:

> Hi Monajit,
>
> This is the auto scaling nature of Flink fighting against the requirement
> of having a single threaded pipeline for Kafka messages (as we need to know
> when messages are finished. When running on Flink the best solution would
> be to use the Beam Kafka Consumer.
>
> Another solution (but not yet tested here so not sure it will work) is to
> force it to a single thread by setting SINGLE_BEAM in the "number of
> copies".
> More information about this can be found on our documentation pages [1]
>
> Kind regards,
> Hans
>
> [1]
> https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html
>
> On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to test a simple kafka consumer using Apache Hop v1.2. When I
>> run the pipeline using the local runner, it works fine. But if I run it
>> using the flink runner I get the following error
>>
>> You can only have one copy of the injector transform 'output' to accept
>> the Kafka messages
>>
>> I have tried debugging the Hop code and looks like the root cause is the
>> initSubPipeline() method being invoked multiple times while using the Flink
>> runner. That's not the case when I use the local runner. Am I missing
>> something here?
>>
>>
>> Thanks
>>
>> Monajit Choudhury
>>
>> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>>
>

Re: Apache Hop Kafka Consumer throws error when I run pipeline using the Flink Engine

Posted by Hans Van Akelyen <ha...@gmail.com>.
Hi Monajit,

This is the auto scaling nature of Flink fighting against the requirement
of having a single threaded pipeline for Kafka messages (as we need to know
when messages are finished. When running on Flink the best solution would
be to use the Beam Kafka Consumer.

Another solution (but not yet tested here so not sure it will work) is to
force it to a single thread by setting SINGLE_BEAM in the "number of
copies".
More information about this can be found on our documentation pages [1]

Kind regards,
Hans

[1]
https://hop.apache.org/manual/latest/pipeline/beam/getting-started-with-beam.html

On Fri, 22 Apr 2022 at 06:50, monajit choudhury <mo...@gmail.com> wrote:

> Hi,
>
> I am trying to test a simple kafka consumer using Apache Hop v1.2. When I
> run the pipeline using the local runner, it works fine. But if I run it
> using the flink runner I get the following error
>
> You can only have one copy of the injector transform 'output' to accept
> the Kafka messages
>
> I have tried debugging the Hop code and looks like the root cause is the
> initSubPipeline() method being invoked multiple times while using the Flink
> runner. That's not the case when I use the local runner. Am I missing
> something here?
>
>
> Thanks
>
> Monajit Choudhury
>
> Linkedin <https://www.linkedin.com/in/monajit-choudhury-b1409a2/>
>