You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Alexey Romanenko (JIRA)" <ji...@apache.org> on 2018/07/20 15:59:00 UTC

[jira] [Commented] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

    [ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16550912#comment-16550912 ] 

Alexey Romanenko commented on BEAM-4632:
----------------------------------------

[~Ricklin] Thank you for all this provided information. Regarding your questions about windows/triggers I'd suggest address them on user@beam.apache.org mailing list.

For your last question about broken pipeline. Could you provide the whole code of your pipeline which reproduces this issue? I'll try to do this on my side.



> KafkaIO seems to fail on streaming mode over spark runner
> ---------------------------------------------------------
>
>                 Key: BEAM-4632
>                 URL: https://issues.apache.org/jira/browse/BEAM-4632
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-spark
>    Affects Versions: 2.4.0
>         Environment: Ubuntu 16.04.4 LTS
>            Reporter: Rick Lin
>            Assignee: Alexey Romanenko
>            Priority: Major
>         Attachments: .withMaxNumRecords(500000).JPG, DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has different number of partitions.JPG, the error GeneratedMessageV3.JPG, the error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==================================
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==================================
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates that the following parameters need to be set, and then the kafkaIO can work well.{color}
>  {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF0000} .withTopic("kafkasink"){color}
> {color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. *However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown*. Here, as mentioned on the website: [https://beam.apache.org/documentation/runners/spark/], the performing program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)