You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by rahul patwari <ra...@gmail.com> on 2020/03/20 04:53:19 UTC

Latency in advancing Spark Watermarks

Hi,

*Usage Info*:
We are using Beam: 2.16.0, Spark: 2.4.2
We are running Spark on Kubernetes.
We are using Spark Streaming(legacy) Runner with Beam Java SDK
The Pipeline has been run with default configurations i.e. default
configurations for SparkPipelineOptions.

*Issue*:
When a Beam Pipeline is submitted to Spark, there is latency in getting the
results of an Aggregation. This latency is increasing with every new
Window. The Pipeline is run with 1-minute fixed windows and default
trigger(When Watermark crosses the end of window). The same pipeline is
working as expected in Flink and Direct runners.

*Initial Analysis*:
The Watermarks for a batch(500ms[default]) are broadcasted when
onBatchCompleted()[1] event for the batch is triggered. We are
observing latency between the time when the batch is completed and the time
when onBatchCompleted() event is triggered for the batch.

For Example, if the batch is completed at 09:29:30(HH:MM:SS), the
onbatchCompleted() event for the batch is triggered at 09:29:35. As you can
see, there is a 5-second delay in this example. We came to know about the
time when the batch is completed from Spark Driver UI and Driver logs.
I have asked a question about this latency in Spark User mailing list[2] and
waiting for a response.

The Watermarks for each batch are added to a queue. When onBatchCompleted()
event is triggered for any batch, the watermarks are polled from the queue
and broadcasted[3]. As there is a delay between the batch Completion and
onBatchCompleted() event getting triggered for the batch, there is a delay
in advancing the Watermarks, because of which there is a latency in
emitting results after the aggregation. As the Pipeline progresses, because
of this latency, the rate at which watermarks are added to the queue is
high compared to the rate at which the Watermarks are polled from the queue
and broadcasted. And as the Pipeline progresses, the latency between Batch
Completion and onBatchCompleted() event getting triggered is increasing.

*Logs*:
These are the trimmed logs which show the issue in action:

*11:37:06*: INFO: scheduler.JobScheduler: "Finished job streaming job
*1584099237500* ms.4 from job set of time 1584099237500 ms"
*11:37:06*: INFO: scheduler.JobScheduler: "Total delay: 189.352 s for time
*1584099237500* ms (execution: 0.942 s)"
*11:40:30*: INFO: util.GlobalWatermarkHolder: "Put new watermark block:
{0=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.621Z,
highWatermark=2020-03-13T11:37:04.882Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z},
1=SparkWatermarks{lowWatermark=2020-03-13T11:37:03.872Z,
highWatermark=2020-03-13T11:37:05.107Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z},
2=SparkWatermarks{lowWatermark=2020-03-13T11:37:04.204Z,
highWatermark=2020-03-13T11:37:05.377Z,
synchronizedProcessingTime=2020-03-13T11:33:57.500Z}}"
*11:40:30*: INFO:
util.GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: "Batch with
timestamp: *1584099237500* has completed, watermarks have been updated."

As you can see, there is almost *3 minutes 24 seconds delay* between the
time when the batch is completed and the time when onBatchCompleted() event
for the batch is triggered(Watermarks are advanced).



Did anyone face this issue before?
What are the factors that can contribute to this latency?
Thanks for any pointers to debug the issue.

[1] -
https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L363
[2] -
http://apache-spark-user-list.1001560.n3.nabble.com/Latency-between-Batch-Completion-and-triggering-of-onBatchCompleted-event-tc37086.html
[3] -
https://github.com/apache/beam/blob/de30361359b70e9fe9729f0f3d52f6c6e8462cfb/runners/spark/src/main/java/org/apache/beam/runners/spark/util/GlobalWatermarkHolder.java#L208

Regards,
Rahul