You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by li...@itri.org.tw on 2018/07/30 08:58:26 UTC

A windows/trigger Question with kafkaIO over Spark Runner

Dear all

I have a question about the use of windows/triggers.
The following versions of related tools are set in my running program:
==================================
Beam 2.4.0 (Direct runner and Spark runner)
Spark 2.3.1 (local mode)
Kafka: 2.11-0.10.1.1
scala: 2.11.8
java: 1.8
==================================
My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO
The configuration setting of Kafka broker is:
==================================
/kafka_broker/bin/kafka-producer-perf-test.sh \
--num-records 10000000 \
--record-size 100 \
--topic kafkasink \
--throughput 10000 \
--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000
==================================

The display of Kafka broker on console is as:
==================================
...
49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.
50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.
50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.
50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.
...
==================================

We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:
==================================
…
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setSparkMaster("local[*]");

PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()
  .withBootstrapServers("ubuntu7:9092")
  .withTopic("kafkasink")
  .withKeyDeserializer(IntegerDeserializer.class)
  .withValueDeserializer(StringDeserializer.class)
  //.withMaxNumRecords(500000)
  .withoutMetadata());

PCollection<KV<Integer, String>> readData1 = readData.
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
    .triggering(AfterWatermark.pastEndOfWindow()
      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes());
…
==================================
The processed data will be imported into PostgresSQL.
The display of results in DB is shown as follows.
224  3000
225  0
226  3000
227  0
228  0
236  0
237  0
238  5000

Unfortunately, results that we are looking forward to is:
224  9000
225  11000
226  9505
227  9829
228  10001

I do not know how to deal with this situation that maybe is about data latency?

1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>

If any further information is needed, I am glad to be informed and will provide to you as soon as possible.

I will highly appreciate it if you can help me to overcome this.

I am looking forward to hearing from you.

Sincerely yours,

Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by Nicolas Viard <ni...@predict.fr>.
Sorry, I didn't see your group function.

In my previous code, I used

Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)


________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



I think that we need the following code:

“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());”

to split data into fixed Windows for a streaming data source (kafka).



IF we do not use the window function, then the error will occur as:



Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.



Rick





From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);

(or set it to x>10000L) and
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());

?

I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner



Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by li...@itri.org.tw.
Hi Nicolas,

I appreciate that you’ve found some problems in my program.

Your suggestion is great and useful for me.

Now, the setting of spark pipeline in my program is:
============================================================
SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).
withValidation().as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(10000L);
options.setBatchIntervalMillis(1000L);
//options.setSparkMaster("local[*]");
options.setSparkMaster("spark://ubuntu8:7077");
…
apply(Window.<KV<Integer, String>>into(new GlobalWindows())
     .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
       .plusDelayOf(Duration.standardSeconds(1)))) // windowSize (1000L)
     .withAllowedLateness(Duration.ZERO)
     .discardingFiredPanes()
);
============================================================


Here, I run spark runner with standalone mode (with two worker nodes) of spark,
Workers (4)
Worker Id                             Address                         State     Cores                 Memory
worker-ubuntu9-41086   ubuntu9:41086       ALIVE       2 (2 Used)        4.0 GB (4.0 GB Used)
worker- ubuntu9-34615  ubuntu9:34615       ALIVE       2 (2 Used)        4.0 GB (4.0 GB Used)
worker- ubuntu8-45286  ubuntu8:45286       ALIVE       2 (2 Used)        4.0 GB (4.0 GB Used)
worker- ubuntu8-39776  ubuntu8:39776       ALIVE       2 (2 Used)        4.0 GB (4.0 GB Used)

and the result is listed as:
Uuid         count
6132        8758
6133        4300
6134        4960
6135        8860
6136        10000
6137        9480
6138        9830
6139        10000
6140        10001
6141        10000
6142        10000
6143        10000
6144        10000
6145        10000
6146        10000
6147        10000
6148        10000
6149        10000
6150        10000
6151        10000

The result shows that

1.      The total amount of data in the window is gradually stable to 10,000.

2.      There is a phenomenon of data delay.

On the kafka broker, the  issue of broker was finished:  10000000 records sent, 9999.920001 records/sec (0.95 MB/sec), 0.19 ms avg latency, 144.00 ms max latency, 0 ms 50th, 1 ms 95th, 1 ms 99th, 1 ms 99.9th.

And, my program over the spark runner is continuously processing the count of data.

If I would like to solve the phenomenon of data delay, do u have any suggestion?

Maybe I need to increase the number of worker nodes or revise the setting of spark default.conf?

Rick

From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 4:55 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner

Sorry, I didn't see your group function.

In my previous code, I used

.apply(

Window.<Data>into(new GlobalWindows())

.triggering(Repeatedly

.forever(AfterProcessingTime

.pastFirstElementInPane()

.plusDelayOf(windowSize)

)

)

.withAllowedLateness(Duration.ZERO).discardingFiredPanes()

)

but I don't remember why I had to add a delay of windowSize (1000L here).

Nicolas

________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



I think that we need the following code:

“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());”

to split data into fixed Windows for a streaming data source (kafka).



IF we do not use the window function, then the error will occur as:



Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.



Rick





From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);

(or set it to x>10000L) and
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());

?

I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner



Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by Nicolas Viard <ni...@predict.fr>.
Sorry, I didn't see your group function.

In my previous code, I used
.apply(
Window.<Data>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(windowSize)
)
)
.withAllowedLateness(Duration.ZERO).discardingFiredPanes()
)
but I don't remember why I had to add a delay of windowSize (1000L here).

Nicolas


________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 10:24:04
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



I think that we need the following code:

“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());”

to split data into fixed Windows for a streaming data source (kafka).



IF we do not use the window function, then the error will occur as:



Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.



Rick





From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);

(or set it to x>10000L) and
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());

?

I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner



Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by li...@itri.org.tw.
Dear Nicolas,

I think that we need the following code:
“apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
      .triggering(AfterWatermark.pastEndOfWindow()
        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());”

to split data into fixed Windows for a streaming data source (kafka).

IF we do not use the window function, then the error will occur as:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.

Rick


From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Tuesday, July 31, 2018 3:52 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);
(or set it to x>10000L) and
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());
?

I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)



Nicolas

________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org<ma...@beam.apache.org>
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by Nicolas Viard <ni...@predict.fr>.
Hi Rick,

Can you try to remove

options.setMaxRecordsPerBatch(1000L);

(or set it to x>10000L) and
  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());

?

I think you don't need to use a window function because your input is splitted in micro-batches by Spark and things like discardingFiredPanes or withAllowedLateness(Duration.ZERO) are Spark's default (/only?) behaviour.

(You can use a Window.of to split a batch into smaller windows.)


Nicolas


________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : mardi 31 juillet 2018 07:59:29
À : user@beam.apache.org
Objet : RE: A windows/trigger Question with kafkaIO over Spark Runner


Dear Nicolas,



Yes, I have set this configure, as



Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setBatchIntervalMillis(1000L);

options.setSparkMaster("local[*]");

…

PCollection<KV<Integer, String>> readData1 = readData.

    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

      .triggering(AfterWatermark.pastEndOfWindow()

        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

      .withAllowedLateness(Duration.ZERO)

      .discardingFiredPanes());



However, the result will show in the following.

“1158      1000

1159        0

1160        0

1161        0

1162        0

1163        0

1164        1000

1165        0

1166        0

1167        0

1168        0

1169        0

1170        0

….”



Rick



From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner



Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________

De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner



Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by li...@itri.org.tw.
Dear Nicolas,

Yes, I have set this configure, as

Pipeline p = Pipeline.create(options);
options.setMaxRecordsPerBatch(1000L);
options.setBatchIntervalMillis(1000L);
options.setSparkMaster("local[*]");
…
PCollection<KV<Integer, String>> readData1 = readData.
    apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))
      .triggering(AfterWatermark.pastEndOfWindow()
        .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

However, the result will show in the following.
“1158      1000
1159        0
1160        0
1161        0
1162        0
1163        0
1164        1000
1165        0
1166        0
1167        0
1168        0
1169        0
1170        0
….”

Rick

From: Nicolas Viard [mailto:nicolas.viard@predict.fr]
Sent: Monday, July 30, 2018 5:35 PM
To: user@beam.apache.org
Subject: RE: A windows/trigger Question with kafkaIO over Spark Runner


Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________
De : linrick@itri.org.tw<ma...@itri.org.tw> <li...@itri.org.tw>>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org<ma...@beam.apache.org>
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.

RE: A windows/trigger Question with kafkaIO over Spark Runner

Posted by Nicolas Viard <ni...@predict.fr>.
Hello,

I think Spark has a default windowing strategy and pulls data from kafka every X ms.

You can override it using SparkPipelineOptions.setBatchIntervalMillis(1000).

Best regards,

Nicolas

________________________________
De : linrick@itri.org.tw <li...@itri.org.tw>
Envoyé : lundi 30 juillet 2018 10:58:26
À : user@beam.apache.org
Objet : A windows/trigger Question with kafkaIO over Spark Runner


Dear all



I have a question about the use of windows/triggers.

The following versions of related tools are set in my running program:

==================================

Beam 2.4.0 (Direct runner and Spark runner)

Spark 2.3.1 (local mode)

Kafka: 2.11-0.10.1.1

scala: 2.11.8

java: 1.8

==================================

My programs (KafkaToKafka.java and StarterPipeline.java) are as shown on Github, as: https://github.com/LinRick/beamkafkaIO

The configuration setting of Kafka broker is:

==================================

/kafka_broker/bin/kafka-producer-perf-test.sh \

--num-records 10000000 \

--record-size 100 \

--topic kafkasink \

--throughput 10000 \

--producer-props acks=0 bootstrap.servers=ubuntu7:9092 batch.size=1000

==================================



The display of Kafka broker on console is as:

==================================

...

49992 records sent, 9998.4 records/sec (0.95 MB/sec), 1.0 ms avg latency, 146.0 max latency.

50040 records sent, 10008.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 5.0 m ax latency.

50019 records sent, 10001.8 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

50011 records sent, 10002.2 records/sec (0.95 MB/sec), 0.2 ms avg latency, 3.0 m ax latency.

50020 records sent, 10002.0 records/sec (0.95 MB/sec), 0.2 ms avg latency, 1.0 m ax latency.

...

==================================



We hope that there are about 10,000 in each window every second by the following settings in my program StarterPipeline.java:

==================================

…

SparkPipelineOptions options = PipelineOptionsFactory.fromArgs(args).

withValidation().as(SparkPipelineOptions.class);

options.setRunner(SparkRunner.class);

Pipeline p = Pipeline.create(options);

options.setMaxRecordsPerBatch(1000L);

options.setSparkMaster("local[*]");



PCollection<KV<Integer, String>> readData = p.apply(KafkaIO.<Integer, String>read()

  .withBootstrapServers("ubuntu7:9092")

  .withTopic("kafkasink")

  .withKeyDeserializer(IntegerDeserializer.class)

  .withValueDeserializer(StringDeserializer.class)

  //.withMaxNumRecords(500000)

  .withoutMetadata());



PCollection<KV<Integer, String>> readData1 = readData.

  apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardSeconds(1)))

    .triggering(AfterWatermark.pastEndOfWindow()

      .withLateFirings(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))

    .withAllowedLateness(Duration.ZERO)

    .discardingFiredPanes());

…

==================================

The processed data will be imported into PostgresSQL.

The display of results in DB is shown as follows.

224  3000

225  0

226  3000

227  0

228  0

236  0

237  0

238  5000



Unfortunately, results that we are looking forward to is:

224  9000

225  11000

226  9505

227  9829

228  10001



I do not know how to deal with this situation that maybe is about data latency?



1.    In addition, I am not sure if this issue is about kafkaIO or I was wrong with settings of spark runner? as the issue BEAM-4632<https://issues.apache.org/jira/browse/BEAM-4632>



If any further information is needed, I am glad to be informed and will provide to you as soon as possible.



I will highly appreciate it if you can help me to overcome this.



I am looking forward to hearing from you.



Sincerely yours,



Rick


--
本信件可能包含工研院機密資訊,非指定之收件者,請勿使用或揭露本信件內容,並請銷毀此信件。 This email may contain confidential information. Please do not use or disclose it in any way and delete it if you are not the intended recipient.