You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by li...@itri.org.tw on 2018/07/30 08:58:26 UTC
A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by Nicolas Viard <ni...@predict.fr>.
Sorry, I didn't see your group function.
In my previous code, I used
Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
I think that we need the following code:
“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());”
to split data into fixed Windows for a streaming data source (kafka).
IF we do not use the window function, then the error will occur as:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by li...@itri.org.tw.
Hi Nicolas,
I appreciate that you’ve found some problems in my program.
Your suggestion is great and useful for me.
Now, the setting of spark pipeline in my program is:
============================================================
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(10000L);
options.setBatchIntervalMillis(1000L);
//options.setSparkMaster("local[*]");
options.setSparkMaster("spark://ubuntu8:7077");
…
apply(Window.<KV<Integer, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))) // windowSize (1000L)
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
);
============================================================
Here, I run spark runner with standalone mode (with two worker nodes) of spark,
Workers (4)
Worker Id Address State Cores Memory
worker-ubuntu9-41086 ubuntu9:41086 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used)
worker- ubuntu9-34615 ubuntu9:34615 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used)
worker- ubuntu8-45286 ubuntu8:45286 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used)
worker- ubuntu8-39776 ubuntu8:39776 ALIVE 2 (2 Used) 4.0 GB (4.0 GB Used)
and the result is listed as:
Uuid count
6132 8758
6133 4300
6134 4960
6135 8860
6136 10000
6137 9480
6138 9830
6139 10000
6140 10001
6141 10000
6142 10000
6143 10000
6144 10000
6145 10000
6146 10000
6147 10000
6148 10000
6149 10000
6150 10000
6151 10000
The result shows that
1. The total amount of data in the window is gradually stable to 10,000.
2. There is a phenomenon of data delay.
On the kafka broker, the issue of broker was finished: 10000000 records sent, 9999.920001 records/sec (0.95 MB/sec), 0.19 ms avg latency, 144.00 ms max latency, 0 ms 50th, 1 ms 95th, 1 ms 99th, 1 ms 99.9th.
And, my program over the spark runner is continuously processing the count of data.
If I would like to solve the phenomenon of data delay, do u have any suggestion?
Maybe I need to increase the number of worker nodes or revise the setting of spark default.conf?
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 4:55 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Sorry, I didn't see your group function.
In my previous code, I used
.apply(
Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
but I don't remember why I had to add a delay of windowSize (1000L here).
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
I think that we need the following code:
“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());”
to split data into fixed Windows for a streaming data source (kafka).
IF we do not use the window function, then the error will occur as:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by Nicolas Viard <ni...@predict.fr>.
Sorry, I didn't see your group function.
In my previous code, I used
.apply(
Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
but I don't remember why I had to add a delay of windowSize (1000L here).
Nicolas
________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
I think that we need the following code:
“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());”
to split data into fixed Windows for a streaming data source (kafka).
IF we do not use the window function, then the error will occur as:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by li...@itri.org.tw.
Dear Nicolas,
I think that we need the following code:
“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());”
to split data into fixed Windows for a streaming data source (kafka).
IF we do not use the window function, then the error will occur as:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by Nicolas Viard <ni...@predict.fr>.
Hi Rick,
Can you try to remove
options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
?
I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.
(You can use a Window.of to split a batch into smaller windows.)
Nicolas
________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by li...@itri.org.tw.
Dear Nicolas,
Yes, I have set this configure, as
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
However, the result will show in the following.
“1158 1000
1159 0
1160 0
1161 0
1162 0
1163 0
1164 1000
1165 0
1166 0
1167 0
1168 0
1169 0
1170 0
….”
Rick
From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.
RE: A windows/trigger Question with kafkaIO over Spark Runner
Posted by Nicolas Viard <ni...@predict.fr>.
Hello,
I think Spark has a default windowing strategy and pulls data from kafka every X ms.
You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).
Best regards,
Nicolas
________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org
Objet : A windows/trigger Question with kafkaIO over Spark Runner
Dear all
I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================
The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================
We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");
PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
.withBootstrapServers("ubuntu7:9092")
.withTopic("kafkasink")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
//.withMaxNumRecords(500000)
.withoutMetadata());
PCollection<KV<Integer, String>> readData1 = readData.
apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224 3000
225 0
226 3000
227 0
228 0
236 0
237 0
238 5000
Unfortunately, results that we are looking forward to is:
224 9000
225 11000
226 9505
227 9829
228 10001
I do not know how to deal with this situation that maybe is about data latency?
1. In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>
If any further information is needed, I am glad to be informed and will provide to you as soon as possible.
I will highly appreciate it if you can help me to overcome this.
I am looking forward to hearing from you.
Sincerely yours,
Rick
--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.