You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Rick Lin (JIRA)" <ji...@apache.org> on 2018/07/30 06:57:00 UTC

[jira] [Comment Edited] (BEAM-4632) KafkaIO seems to fail on streaming mode over spark runner

    [ https://issues.apache.org/jira/browse/BEAM-4632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561540#comment-16561540 ] 

Rick Lin edited comment on BEAM-4632 at 7/30/18 6:56 AM:
---------------------------------------------------------

By the way, my configuration setting of kafka broker is:

 
{code:java}
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000{code}
The display of kafka broker on console is as:

...

49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
 50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
 50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
 50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
 50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

Rick

 

 


was (Author: ricklin):
By the way, my configuration setting of kafka broker is:

 
{code:java}
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0  \
bootstrap.servers=ubuntu7:9092  \
batch.size=1000{code}
The display of kafka broker on console is as:

...

49992 records sent,{color:#d04437} 9998.4 records/sec{color} (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, {color:#d04437}10008.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, {color:#d04437}10001.8 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, {color:#d04437}10002.2 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, {color:#d04437}10002.0 records/sec{color} (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

Rick

 

 

> KafkaIO seems to fail on streaming mode over spark runner
> ---------------------------------------------------------
>
>                 Key: BEAM-4632
>                 URL: https://issues.apache.org/jira/browse/BEAM-4632
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kafka, runner-spark
>    Affects Versions: 2.4.0
>         Environment: Ubuntu 16.04.4 LTS
>            Reporter: Rick Lin
>            Assignee: Alexey Romanenko
>            Priority: Major
>         Attachments: .withMaxNumRecords(500000).JPG, DB_table_kafkabeamdata_count.JPG, error UnboundedDataset.java 81(0) has different number of partitions.JPG, the error GeneratedMessageV3.JPG, the error GeneratedMessageV3.JPG
>
>
> Dear sir,
> The following versions of related tools are set in my running program:
> ==================================
> Beam 2.4.0 (Direct runner and Spark runner)
> Spark 2.2.1 (local mode and standalone mode)
> Kafka: 2.11-0.10.1.1
> scala: 2.11.8
> java: 1.8
> ==================================
> My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on my github: [https://github.com/LinRick/beamkafkaIO],
> The description of my situation is as:
> {color:#14892c}The kafka broker is working and kafkaIO.read (consumer) is used to capture data from the assigned broker ip ([http://ubuntu7:9092)|http://ubuntu7:9092)./].{color}
> {color:#14892c}The user manual of kafkaIO SDK (on web:[https://beam.apache.org/documentation/sdks/javadoc/2.4.0/])  indicates that the following parameters need to be set, and then the kafkaIO can work well.{color}
>  {color:#FF0000}.withBootstrapServers("kafka broker ip:9092"){color}
> {color:#FF0000} .withTopic("kafkasink"){color}
> {color:#FF0000} .withKeyDeserializer(IntegerDeserializer.class){color}
> {color:#FF0000} .withValueDeserializer(StringDeserializer.class) {color}
> When i run my program with these settings over direct runner, i can find that my program perform well. In addition, my running program is the streaming mode. *However, i run these codes with the same settings (kafkaIO) over spark runner, and my running program is not the streaming mode and is shutdown*. Here, as mentioned on the website: [https://beam.apache.org/documentation/runners/spark/], the performing program will automatically set streaming mode. 
> Unfortunately, it failed for my program.
> On the other hand, If i set the parameter  kafkaIO.read.withMaxNumRecords (1000) or  kafkaIO.read.withMaxReadTime (Duration second), my program will successfully execute as the batch mode (batch processing).
> The steps of performing StarterPipeline.java in my program are:
> step1 mvn compile exec:java -Dexec.mainClass=com.itri.beam.StarterPipeline -Pspark2-runner -Dexec.args="--runner=SparkRunner"
> step2 mvn clean package
> step3 cp -rf target/beamkafkaIO-0.1.jar /root/
> step4 cd /spark-2.2.1-bin-hadoop2.6/bin
> step5 ./spark-submit --class com.itri.beam.StarterPipeline --master local[4] /root/beamkafkaIO-0.1.jar --runner=SparkRunner
> I am not sure if this issue is a bug about kafkaIO or I was wrong with some parameter settings over spark runner ?
> I really can't handle it, so I hope to get help from you.
> if any further information is needed, i am glad to be informed and will provide to you as soon as possible.
> I will highly appreciate it if you can help me to deal with this issue.
> i am looking forward to hearing from you.
>  
> Sincerely yours,
>  
> Rick
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)