You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by shanta chakpram <sh...@gmail.com> on 2019/09/05 14:17:47 UTC

Watermark is lagging in Spark Runner with kafkaIO

Hi,

We have detected an issue with SparkRunner and Watermark.

*Pipeline*: Read from two Kafka Sources => Apply fixed window of duration 1
minute to both the PCollections => Apply SqlTransform with query "select
c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join
`kafka_source2` as c on s.name = c.name" => write the emitted output to
Kafka Sink

we are using the watermark provided in
https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
We have given maxDelay as 0.

As we have applied fixed window of 1 minute duration and as the elements
timestamps are monotonically increasing, we are expecting the output to be
emitted when the current processing time crosses 12-02-00 with a reasonable
delay(say 10 seconds). But, we are getting the result of the window after a
long delay.

In Spark logs it seems that the watermark is lagging.
Here are the logs:
19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
highWatermark=2019-09-05T11:57:06.302Z,
synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
highWatermark=2019-09-05T11:57:06.686Z,
synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
19/09/05 12:02:50 INFO
GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
timestamp: 1567684500500 has completed, watermarks have been updated.

As you can see, when the current processing time is 12:02:50, the
highWatermark is 11:57:06.
As the processing time progresses, the gap between processing time and
highWatermark is increasing.

We ran the same pipeline with same data in Flink Runner and Direct Runner
and we have not seen this issue. In these runners, we can see that the
Watermark is almost equal to Processing time.

Sample Input Data :

kafka_source1:
value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}

kafka_source2:
value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
12-01-26 704060'}
value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27
712300'}
value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28
713951'}

what can be the issue here?

Regards,
shanta

Re: Watermark is lagging in Spark Runner with kafkaIO

Posted by rahul patwari <ra...@gmail.com>.
Forgot to mention: A FixedWindow of duration 1 minute is applied before
applying SqlTransform.

On Tue, Sep 10, 2019 at 6:03 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
> I am facing this issue too.
> +dev <de...@beam.apache.org>
>
> Here is the Pipeline that we are using(providing a very simple pipeline to
> highlight the issue):
> KafkaSource -> SqlTransform -> KafkaSink
>
> We are reading from a single topic in KafkaSource with a single partition.
>
> Here is the data that we are producing to KafkaSource topic:
> "str1", "2019-09-10 11:36:42"
> "str2", "2019-09-10 11:36:44"
> "str3", "2019-09-10 11:36:45"
>
> The first column name is "strCol".
> The second column, i.e. the timestamp in string format is being used as
> the timestamp of the element.
> The timestamp is the wall time when the record got generated.
> After publishing this data to the Kafka topic, we are not publishing any
> more data. The topic is idle after that.
> The timestamps of the records are monotonically increasing.
>
> Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"
>
> Here is the result from KafkaSink:
> {"strCol":{"string":"str1"}}
> {"strCol":{"string":"str3"}}
> {"strCol":{"string":"str2"}}
>
> The expected result is written to KafkaSink Correctly, *but with a delay*.
>
> Here are the logs from Spark Driver:
> ...
> 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
> highWatermark=2019-09-10T11:43:37.273Z,
> synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
> ...
> 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
> highWatermark=2019-09-10T11:44:54.238Z,
> synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}
>
> As per the logs,
> when the processing time was 12:12:42, the highWatermark was at 11:43:37.
> Almost 30 minutes delay. And
> when the processing time was 12:18:53, the highWatermark was at 11:44:54.
>
> From the above logs, it seems that the watermark is moving slowly.
>
> Is there an IT for SparkRunner with Unbounded data and Windowing
> aggregation?
> Is this a known bug?
>
> Thanks,
> Rahul
>
> On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <sh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We have detected an issue with SparkRunner and Watermark.
>>
>> *Pipeline*: Read from two Kafka Sources => Apply fixed window of
>> duration 1 minute to both the PCollections => Apply SqlTransform with query
>> "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s
>> join `kafka_source2` as c on s.name = c.name" => write the emitted
>> output to Kafka Sink
>>
>> we are using the watermark provided in
>> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
>> We have given maxDelay as 0.
>>
>> As we have applied fixed window of 1 minute duration and as the elements
>> timestamps are monotonically increasing, we are expecting the output to be
>> emitted when the current processing time crosses 12-02-00 with a reasonable
>> delay(say 10 seconds). But, we are getting the result of the window after a
>> long delay.
>>
>> In Spark logs it seems that the watermark is lagging.
>> Here are the logs:
>> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
>> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
>> highWatermark=2019-09-05T11:57:06.302Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
>> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
>> highWatermark=2019-09-05T11:57:06.686Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
>> 19/09/05 12:02:50 INFO
>> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
>> timestamp: 1567684500500 has completed, watermarks have been updated.
>>
>> As you can see, when the current processing time is 12:02:50, the
>> highWatermark is 11:57:06.
>> As the processing time progresses, the gap between processing time and
>> highWatermark is increasing.
>>
>> We ran the same pipeline with same data in Flink Runner and Direct Runner
>> and we have not seen this issue. In these runners, we can see that the
>> Watermark is almost equal to Processing time.
>>
>> Sample Input Data :
>>
>> kafka_source1:
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
>> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>>
>> kafka_source2:
>> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
>> 12-01-26 704060'}
>> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05
>> 12-01-27 712300'}
>> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05
>> 12-01-28 713951'}
>>
>> what can be the issue here?
>>
>> Regards,
>> shanta
>>
>

Re: Watermark is lagging in Spark Runner with kafkaIO

Posted by rahul patwari <ra...@gmail.com>.
Forgot to mention: A FixedWindow of duration 1 minute is applied before
applying SqlTransform.

On Tue, Sep 10, 2019 at 6:03 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
> I am facing this issue too.
> +dev <de...@beam.apache.org>
>
> Here is the Pipeline that we are using(providing a very simple pipeline to
> highlight the issue):
> KafkaSource -> SqlTransform -> KafkaSink
>
> We are reading from a single topic in KafkaSource with a single partition.
>
> Here is the data that we are producing to KafkaSource topic:
> "str1", "2019-09-10 11:36:42"
> "str2", "2019-09-10 11:36:44"
> "str3", "2019-09-10 11:36:45"
>
> The first column name is "strCol".
> The second column, i.e. the timestamp in string format is being used as
> the timestamp of the element.
> The timestamp is the wall time when the record got generated.
> After publishing this data to the Kafka topic, we are not publishing any
> more data. The topic is idle after that.
> The timestamps of the records are monotonically increasing.
>
> Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"
>
> Here is the result from KafkaSink:
> {"strCol":{"string":"str1"}}
> {"strCol":{"string":"str3"}}
> {"strCol":{"string":"str2"}}
>
> The expected result is written to KafkaSink Correctly, *but with a delay*.
>
> Here are the logs from Spark Driver:
> ...
> 19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
> highWatermark=2019-09-10T11:43:37.273Z,
> synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
> ...
> 19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
> highWatermark=2019-09-10T11:44:54.238Z,
> synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}
>
> As per the logs,
> when the processing time was 12:12:42, the highWatermark was at 11:43:37.
> Almost 30 minutes delay. And
> when the processing time was 12:18:53, the highWatermark was at 11:44:54.
>
> From the above logs, it seems that the watermark is moving slowly.
>
> Is there an IT for SparkRunner with Unbounded data and Windowing
> aggregation?
> Is this a known bug?
>
> Thanks,
> Rahul
>
> On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <sh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We have detected an issue with SparkRunner and Watermark.
>>
>> *Pipeline*: Read from two Kafka Sources => Apply fixed window of
>> duration 1 minute to both the PCollections => Apply SqlTransform with query
>> "select c.datetime, c.country ,s.name, s.id from `kafka_source1` as s
>> join `kafka_source2` as c on s.name = c.name" => write the emitted
>> output to Kafka Sink
>>
>> we are using the watermark provided in
>> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
>> We have given maxDelay as 0.
>>
>> As we have applied fixed window of 1 minute duration and as the elements
>> timestamps are monotonically increasing, we are expecting the output to be
>> emitted when the current processing time crosses 12-02-00 with a reasonable
>> delay(say 10 seconds). But, we are getting the result of the window after a
>> long delay.
>>
>> In Spark logs it seems that the watermark is lagging.
>> Here are the logs:
>> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
>> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
>> highWatermark=2019-09-05T11:57:06.302Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
>> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
>> highWatermark=2019-09-05T11:57:06.686Z,
>> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
>> 19/09/05 12:02:50 INFO
>> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
>> timestamp: 1567684500500 has completed, watermarks have been updated.
>>
>> As you can see, when the current processing time is 12:02:50, the
>> highWatermark is 11:57:06.
>> As the processing time progresses, the gap between processing time and
>> highWatermark is increasing.
>>
>> We ran the same pipeline with same data in Flink Runner and Direct Runner
>> and we have not seen this issue. In these runners, we can see that the
>> Watermark is almost equal to Processing time.
>>
>> Sample Input Data :
>>
>> kafka_source1:
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
>> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
>> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>>
>> kafka_source2:
>> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
>> 12-01-26 704060'}
>> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05
>> 12-01-27 712300'}
>> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05
>> 12-01-28 713951'}
>>
>> what can be the issue here?
>>
>> Regards,
>> shanta
>>
>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by Ismaël Mejía <ie...@gmail.com>.
Hello,

Sinks are not bounded or unbounded, they are just normal ParDos (DoFns)
that behave consistently with the pipeline data, so if your pipeline deals
with unbounded data the sink will write this data correspondingly (when
windows close, triggers match, etc so data is ready to be out).

One pattern that was reported with a similar pipeline but with JdbcIO as a
sink was the connection exhaustion of the database, this happened because
the pipeline was producing lots of windows / materializations that ended
creating thousands of DoFn writes to the sink and every write to the sink
was requesting a new connection so the database got overwhelmed. This was
fixed for JdbcIO via a pool of connections and smarter instantiation. If
this is happening on HCatalogIO it is an issue that we Beam devs should
fix, but this should not be a concern for your pipeline design.

Regards,
Ismaël


On Tue, Feb 25, 2020 at 3:30 AM rahul patwari <ra...@gmail.com>
wrote:

> Hi Kenn, Rui,
>
> The pipeline that we are trying is exactly what Kenn has mentioned above
> i.e.
> Read From Kafka => Apply Fixed Windows of 1 Min => SqlTransform => Write
> to Hive using HcatalogIO
>
> We are interested in understanding the behaviour when the source is
> Unbounded and Sink is bounded as this pipeline is being used for ETL.
> Does the same pipeline work for any other Bounded Sink, instead of
> HcatalogIO?
> What are the features required to be supported by the Bounded Sink, for it
> to be used along with an Unbounded Source?
>
> Are there any best practices (or) pipeline patterns for these kinds of
> pipelines? Will there be any performance hits?
>
> Regards,
> Rahul
>
> On Tue, Feb 25, 2020 at 6:57 AM Rui Wang <ru...@google.com> wrote:
>
>> Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))"
>> from the query above.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <ru...@google.com> wrote:
>>
>>> I see. So I guess I wasn't fully understand the requirement:
>>>
>>> Do you want to have a 1-min window join on two unbounded sources and
>>> write to sink when the window closes ? Or there is an extra requirement
>>> such that you also want to write to sink every minute per window?
>>>
>>> For the former, you can do it by SQL:
>>>
>>>     pipeline.apply(KafkaIO.read() ... )
>>>         .apply(Window.into(FixedWindows.of(1 minute))
>>>         .apply(SqlTransform(
>>>                   "SELECT ... FROM
>>>                         (select TUMBLE_START() as window_start, * FROM
>>> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>>>                   JOIN
>>>                         (select TUMBLE_START() as window_start, * FROM
>>> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>>>                    on table_a.window_start = table_b.window_start ...")
>>>         .apply(HCatalogIO.write() ...)
>>>
>>> But as Kenneth mentioned HCatalogIO might not work as expected.
>>>
>>>
>>>
>>> For the latter, the mixed Java and SQL pipeline won't help you.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <ke...@apache.org> wrote:
>>>
>>>> I think actually it depends on the pipeline. You cannot do it all in
>>>> SQL, but if you mix Java and SQL I think you can do this. If you write this:
>>>>
>>>>     pipeline.apply(KafkaIO.read() ... )
>>>>         .apply(Window.into(FixedWindows.of(1 minute))
>>>>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>>>         .apply(HCatalogIO.write() ...)
>>>>
>>>> This should apply the SQL on each window. When the SQL does not do any
>>>> windowing, it is required to be a "per-window" SQL execution. That is the
>>>> spec for SqlTransform. If that does not work, please report your experience.
>>>>
>>>> But the SQL semantics do not require waiting. Today the
>>>> stream-to-stream join will do a CoGroupByKey so it will wait. But SQL may
>>>> in the future adopt a better join for this case that can output records
>>>> with lower latency.
>>>>
>>>> It may be a bigger question whether HCatalogIO.write() has all the
>>>> knobs you would like.
>>>>
>>>> Kenn
>>>>
>>>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ru...@google.com> wrote:
>>>>
>>>>> SQL does not support such joins with your requirement: write to sink
>>>>> after every 1 min after window closes.
>>>>>
>>>>> You might can use state and timer API to achieve your goal.
>>>>>
>>>>>
>>>>>
>>>>> -Rui
>>>>>
>>>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>>>> shantachakpram@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I am trying to join inputs from Unbounded Sources then write to
>>>>>> Bounded Sink.
>>>>>> The pipeline I'm trying is:
>>>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>>>>
>>>>>> I'm expecting the inputs from unbounded sources joined within the
>>>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>>>> after each window interval.
>>>>>>
>>>>>> Can someone please tell if this is a valid scenario and what is the
>>>>>> expected behaviour from this kind of scenario?
>>>>>>
>>>>>> Regards,
>>>>>> Shanta
>>>>>>
>>>>>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by rahul patwari <ra...@gmail.com>.
Hi Kenn, Rui,

The pipeline that we are trying is exactly what Kenn has mentioned above
i.e.
Read From Kafka => Apply Fixed Windows of 1 Min => SqlTransform => Write to
Hive using HcatalogIO

We are interested in understanding the behaviour when the source is
Unbounded and Sink is bounded as this pipeline is being used for ETL.
Does the same pipeline work for any other Bounded Sink, instead of
HcatalogIO?
What are the features required to be supported by the Bounded Sink, for it
to be used along with an Unbounded Source?

Are there any best practices (or) pipeline patterns for these kinds of
pipelines? Will there be any performance hits?

Regards,
Rahul

On Tue, Feb 25, 2020 at 6:57 AM Rui Wang <ru...@google.com> wrote:

> Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
> the query above.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <ru...@google.com> wrote:
>
>> I see. So I guess I wasn't fully understand the requirement:
>>
>> Do you want to have a 1-min window join on two unbounded sources and
>> write to sink when the window closes ? Or there is an extra requirement
>> such that you also want to write to sink every minute per window?
>>
>> For the former, you can do it by SQL:
>>
>>     pipeline.apply(KafkaIO.read() ... )
>>         .apply(Window.into(FixedWindows.of(1 minute))
>>         .apply(SqlTransform(
>>                   "SELECT ... FROM
>>                         (select TUMBLE_START() as window_start, * FROM
>> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>>                   JOIN
>>                         (select TUMBLE_START() as window_start, * FROM
>> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>>                    on table_a.window_start = table_b.window_start ...")
>>         .apply(HCatalogIO.write() ...)
>>
>> But as Kenneth mentioned HCatalogIO might not work as expected.
>>
>>
>>
>> For the latter, the mixed Java and SQL pipeline won't help you.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <ke...@apache.org> wrote:
>>
>>> I think actually it depends on the pipeline. You cannot do it all in
>>> SQL, but if you mix Java and SQL I think you can do this. If you write this:
>>>
>>>     pipeline.apply(KafkaIO.read() ... )
>>>         .apply(Window.into(FixedWindows.of(1 minute))
>>>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>>         .apply(HCatalogIO.write() ...)
>>>
>>> This should apply the SQL on each window. When the SQL does not do any
>>> windowing, it is required to be a "per-window" SQL execution. That is the
>>> spec for SqlTransform. If that does not work, please report your experience.
>>>
>>> But the SQL semantics do not require waiting. Today the stream-to-stream
>>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>>> adopt a better join for this case that can output records with lower
>>> latency.
>>>
>>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>>> you would like.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ru...@google.com> wrote:
>>>
>>>> SQL does not support such joins with your requirement: write to sink
>>>> after every 1 min after window closes.
>>>>
>>>> You might can use state and timer API to achieve your goal.
>>>>
>>>>
>>>>
>>>> -Rui
>>>>
>>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>>> shantachakpram@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am trying to join inputs from Unbounded Sources then write to
>>>>> Bounded Sink.
>>>>> The pipeline I'm trying is:
>>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>>>
>>>>> I'm expecting the inputs from unbounded sources joined within the
>>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>>> after each window interval.
>>>>>
>>>>> Can someone please tell if this is a valid scenario and what is the
>>>>> expected behaviour from this kind of scenario?
>>>>>
>>>>> Regards,
>>>>> Shanta
>>>>>
>>>>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by Rui Wang <ru...@google.com>.
Sorry please remove " .apply(Window.into(FixedWindows.of(1 minute))" from
the query above.



-Rui

On Mon, Feb 24, 2020 at 5:26 PM Rui Wang <ru...@google.com> wrote:

> I see. So I guess I wasn't fully understand the requirement:
>
> Do you want to have a 1-min window join on two unbounded sources and write
> to sink when the window closes ? Or there is an extra requirement such that
> you also want to write to sink every minute per window?
>
> For the former, you can do it by SQL:
>
>     pipeline.apply(KafkaIO.read() ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform(
>                   "SELECT ... FROM
>                         (select TUMBLE_START() as window_start, * FROM
> stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
>                   JOIN
>                         (select TUMBLE_START() as window_start, * FROM
> stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
>                    on table_a.window_start = table_b.window_start ...")
>         .apply(HCatalogIO.write() ...)
>
> But as Kenneth mentioned HCatalogIO might not work as expected.
>
>
>
> For the latter, the mixed Java and SQL pipeline won't help you.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <ke...@apache.org> wrote:
>
>> I think actually it depends on the pipeline. You cannot do it all in SQL,
>> but if you mix Java and SQL I think you can do this. If you write this:
>>
>>     pipeline.apply(KafkaIO.read() ... )
>>         .apply(Window.into(FixedWindows.of(1 minute))
>>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>>         .apply(HCatalogIO.write() ...)
>>
>> This should apply the SQL on each window. When the SQL does not do any
>> windowing, it is required to be a "per-window" SQL execution. That is the
>> spec for SqlTransform. If that does not work, please report your experience.
>>
>> But the SQL semantics do not require waiting. Today the stream-to-stream
>> join will do a CoGroupByKey so it will wait. But SQL may in the future
>> adopt a better join for this case that can output records with lower
>> latency.
>>
>> It may be a bigger question whether HCatalogIO.write() has all the knobs
>> you would like.
>>
>> Kenn
>>
>> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ru...@google.com> wrote:
>>
>>> SQL does not support such joins with your requirement: write to sink
>>> after every 1 min after window closes.
>>>
>>> You might can use state and timer API to achieve your goal.
>>>
>>>
>>>
>>> -Rui
>>>
>>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <
>>> shantachakpram@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>>> Sink.
>>>> The pipeline I'm trying is:
>>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>>  And, a FixedWindow of 1 minute duration is applied.
>>>>
>>>> I'm expecting the inputs from unbounded sources joined within the
>>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>>> after each window interval.
>>>>
>>>> Can someone please tell if this is a valid scenario and what is the
>>>> expected behaviour from this kind of scenario?
>>>>
>>>> Regards,
>>>> Shanta
>>>>
>>>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by Rui Wang <ru...@google.com>.
I see. So I guess I wasn't fully understand the requirement:

Do you want to have a 1-min window join on two unbounded sources and write
to sink when the window closes ? Or there is an extra requirement such that
you also want to write to sink every minute per window?

For the former, you can do it by SQL:

    pipeline.apply(KafkaIO.read() ... )
        .apply(Window.into(FixedWindows.of(1 minute))
        .apply(SqlTransform(
                  "SELECT ... FROM
                        (select TUMBLE_START() as window_start, * FROM
stream1 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_a
                  JOIN
                        (select TUMBLE_START() as window_start, * FROM
stream2 GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE))  as table_b
                   on table_a.window_start = table_b.window_start ...")
        .apply(HCatalogIO.write() ...)

But as Kenneth mentioned HCatalogIO might not work as expected.



For the latter, the mixed Java and SQL pipeline won't help you.



-Rui

On Mon, Feb 24, 2020 at 5:04 PM Kenneth Knowles <ke...@apache.org> wrote:

> I think actually it depends on the pipeline. You cannot do it all in SQL,
> but if you mix Java and SQL I think you can do this. If you write this:
>
>     pipeline.apply(KafkaIO.read() ... )
>         .apply(Window.into(FixedWindows.of(1 minute))
>         .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
>         .apply(HCatalogIO.write() ...)
>
> This should apply the SQL on each window. When the SQL does not do any
> windowing, it is required to be a "per-window" SQL execution. That is the
> spec for SqlTransform. If that does not work, please report your experience.
>
> But the SQL semantics do not require waiting. Today the stream-to-stream
> join will do a CoGroupByKey so it will wait. But SQL may in the future
> adopt a better join for this case that can output records with lower
> latency.
>
> It may be a bigger question whether HCatalogIO.write() has all the knobs
> you would like.
>
> Kenn
>
> On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ru...@google.com> wrote:
>
>> SQL does not support such joins with your requirement: write to sink
>> after every 1 min after window closes.
>>
>> You might can use state and timer API to achieve your goal.
>>
>>
>>
>> -Rui
>>
>> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <sh...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am trying to join inputs from Unbounded Sources then write to Bounded
>>> Sink.
>>> The pipeline I'm trying is:
>>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>>  And, a FixedWindow of 1 minute duration is applied.
>>>
>>> I'm expecting the inputs from unbounded sources joined within the
>>> current window to be written to the HCatalogIO Sink after every 1 min i.e
>>> after each window interval.
>>>
>>> Can someone please tell if this is a valid scenario and what is the
>>> expected behaviour from this kind of scenario?
>>>
>>> Regards,
>>> Shanta
>>>
>>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by Kenneth Knowles <ke...@apache.org>.
I think actually it depends on the pipeline. You cannot do it all in SQL,
but if you mix Java and SQL I think you can do this. If you write this:

    pipeline.apply(KafkaIO.read() ... )
        .apply(Window.into(FixedWindows.of(1 minute))
        .apply(SqlTransform("SELECT ... FROM stream1 JOIN stream2 ...")
        .apply(HCatalogIO.write() ...)

This should apply the SQL on each window. When the SQL does not do any
windowing, it is required to be a "per-window" SQL execution. That is the
spec for SqlTransform. If that does not work, please report your experience.

But the SQL semantics do not require waiting. Today the stream-to-stream
join will do a CoGroupByKey so it will wait. But SQL may in the future
adopt a better join for this case that can output records with lower
latency.

It may be a bigger question whether HCatalogIO.write() has all the knobs
you would like.

Kenn

On Mon, Feb 24, 2020 at 12:14 PM Rui Wang <ru...@google.com> wrote:

> SQL does not support such joins with your requirement: write to sink after
> every 1 min after window closes.
>
> You might can use state and timer API to achieve your goal.
>
>
>
> -Rui
>
> On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <sh...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am trying to join inputs from Unbounded Sources then write to Bounded
>> Sink.
>> The pipeline I'm trying is:
>> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>>  And, a FixedWindow of 1 minute duration is applied.
>>
>> I'm expecting the inputs from unbounded sources joined within the current
>> window to be written to the HCatalogIO Sink after every 1 min i.e after
>> each window interval.
>>
>> Can someone please tell if this is a valid scenario and what is the
>> expected behaviour from this kind of scenario?
>>
>> Regards,
>> Shanta
>>
>

Re: Unbounded input join Unbounded input then write to Bounded Sink

Posted by Rui Wang <ru...@google.com>.
SQL does not support such joins with your requirement: write to sink after
every 1 min after window closes.

You might can use state and timer API to achieve your goal.



-Rui

On Mon, Feb 24, 2020 at 9:50 AM shanta chakpram <sh...@gmail.com>
wrote:

> Hi,
>
> I am trying to join inputs from Unbounded Sources then write to Bounded
> Sink.
> The pipeline I'm trying is:
> Kafka Sources -> SqlTransform -> HCatalogIO  Sink
>  And, a FixedWindow of 1 minute duration is applied.
>
> I'm expecting the inputs from unbounded sources joined within the current
> window to be written to the HCatalogIO Sink after every 1 min i.e after
> each window interval.
>
> Can someone please tell if this is a valid scenario and what is the
> expected behaviour from this kind of scenario?
>
> Regards,
> Shanta
>

Unbounded input join Unbounded input then write to Bounded Sink

Posted by shanta chakpram <sh...@gmail.com>.
Hi,

I am trying to join inputs from Unbounded Sources then write to Bounded
Sink.
The pipeline I'm trying is:
Kafka Sources -> SqlTransform -> HCatalogIO  Sink
 And, a FixedWindow of 1 minute duration is applied.

I'm expecting the inputs from unbounded sources joined within the current
window to be written to the HCatalogIO Sink after every 1 min i.e after
each window interval.

Can someone please tell if this is a valid scenario and what is the
expected behaviour from this kind of scenario?

Regards,
Shanta

Re: Watermark is lagging in Spark Runner with kafkaIO

Posted by rahul patwari <ra...@gmail.com>.
Hi,
I am facing this issue too.
+dev <de...@beam.apache.org>

Here is the Pipeline that we are using(providing a very simple pipeline to
highlight the issue):
KafkaSource -> SqlTransform -> KafkaSink

We are reading from a single topic in KafkaSource with a single partition.

Here is the data that we are producing to KafkaSource topic:
"str1", "2019-09-10 11:36:42"
"str2", "2019-09-10 11:36:44"
"str3", "2019-09-10 11:36:45"

The first column name is "strCol".
The second column, i.e. the timestamp in string format is being used as the
timestamp of the element.
The timestamp is the wall time when the record got generated.
After publishing this data to the Kafka topic, we are not publishing any
more data. The topic is idle after that.
The timestamps of the records are monotonically increasing.

Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"

Here is the result from KafkaSink:
{"strCol":{"string":"str1"}}
{"strCol":{"string":"str3"}}
{"strCol":{"string":"str2"}}

The expected result is written to KafkaSink Correctly, *but with a delay*.

Here are the logs from Spark Driver:
...
19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
highWatermark=2019-09-10T11:43:37.273Z,
synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
...
19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
highWatermark=2019-09-10T11:44:54.238Z,
synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}

As per the logs,
when the processing time was 12:12:42, the highWatermark was at 11:43:37.
Almost 30 minutes delay. And
when the processing time was 12:18:53, the highWatermark was at 11:44:54.

From the above logs, it seems that the watermark is moving slowly.

Is there an IT for SparkRunner with Unbounded data and Windowing
aggregation?
Is this a known bug?

Thanks,
Rahul

On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <sh...@gmail.com>
wrote:

> Hi,
>
> We have detected an issue with SparkRunner and Watermark.
>
> *Pipeline*: Read from two Kafka Sources => Apply fixed window of duration
> 1 minute to both the PCollections => Apply SqlTransform with query "select
> c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join
> `kafka_source2` as c on s.name = c.name" => write the emitted output to
> Kafka Sink
>
> we are using the watermark provided in
> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
> We have given maxDelay as 0.
>
> As we have applied fixed window of 1 minute duration and as the elements
> timestamps are monotonically increasing, we are expecting the output to be
> emitted when the current processing time crosses 12-02-00 with a reasonable
> delay(say 10 seconds). But, we are getting the result of the window after a
> long delay.
>
> In Spark logs it seems that the watermark is lagging.
> Here are the logs:
> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
> highWatermark=2019-09-05T11:57:06.302Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
> highWatermark=2019-09-05T11:57:06.686Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
> 19/09/05 12:02:50 INFO
> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
> timestamp: 1567684500500 has completed, watermarks have been updated.
>
> As you can see, when the current processing time is 12:02:50, the
> highWatermark is 11:57:06.
> As the processing time progresses, the gap between processing time and
> highWatermark is increasing.
>
> We ran the same pipeline with same data in Flink Runner and Direct Runner
> and we have not seen this issue. In these runners, we can see that the
> Watermark is almost equal to Processing time.
>
> Sample Input Data :
>
> kafka_source1:
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>
> kafka_source2:
> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
> 12-01-26 704060'}
> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27
> 712300'}
> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28
> 713951'}
>
> what can be the issue here?
>
> Regards,
> shanta
>

Re: Watermark is lagging in Spark Runner with kafkaIO

Posted by rahul patwari <ra...@gmail.com>.
Hi,
I am facing this issue too.
+dev <de...@beam.apache.org>

Here is the Pipeline that we are using(providing a very simple pipeline to
highlight the issue):
KafkaSource -> SqlTransform -> KafkaSink

We are reading from a single topic in KafkaSource with a single partition.

Here is the data that we are producing to KafkaSource topic:
"str1", "2019-09-10 11:36:42"
"str2", "2019-09-10 11:36:44"
"str3", "2019-09-10 11:36:45"

The first column name is "strCol".
The second column, i.e. the timestamp in string format is being used as the
timestamp of the element.
The timestamp is the wall time when the record got generated.
After publishing this data to the Kafka topic, we are not publishing any
more data. The topic is idle after that.
The timestamps of the records are monotonically increasing.

Sql query: "select strCol FROM PCOLLECTION GROUP BY strCol"

Here is the result from KafkaSink:
{"strCol":{"string":"str1"}}
{"strCol":{"string":"str3"}}
{"strCol":{"string":"str2"}}

The expected result is written to KafkaSink Correctly, *but with a delay*.

Here are the logs from Spark Driver:
...
19/09/10 12:12:42 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:43:37.273Z,
highWatermark=2019-09-10T11:43:37.273Z,
synchronizedProcessingTime=2019-09-10T11:40:33.000Z}}
...
19/09/10 12:18:53 INFO GlobalWatermarkHolder: Put new watermark block:
{0=SparkWatermarks{lowWatermark=2019-09-10T11:44:54.238Z,
highWatermark=2019-09-10T11:44:54.238Z,
synchronizedProcessingTime=2019-09-10T11:41:17.000Z}}

As per the logs,
when the processing time was 12:12:42, the highWatermark was at 11:43:37.
Almost 30 minutes delay. And
when the processing time was 12:18:53, the highWatermark was at 11:44:54.

From the above logs, it seems that the watermark is moving slowly.

Is there an IT for SparkRunner with Unbounded data and Windowing
aggregation?
Is this a known bug?

Thanks,
Rahul

On Thu, Sep 5, 2019 at 7:48 PM shanta chakpram <sh...@gmail.com>
wrote:

> Hi,
>
> We have detected an issue with SparkRunner and Watermark.
>
> *Pipeline*: Read from two Kafka Sources => Apply fixed window of duration
> 1 minute to both the PCollections => Apply SqlTransform with query "select
> c.datetime, c.country ,s.name, s.id from `kafka_source1` as s join
> `kafka_source2` as c on s.name = c.name" => write the emitted output to
> Kafka Sink
>
> we are using the watermark provided in
> https://github.com/apache/beam/blob/8869fcebdb9ddff375d8e7b408f1d971e1257815/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java#L74.
> We have given maxDelay as 0.
>
> As we have applied fixed window of 1 minute duration and as the elements
> timestamps are monotonically increasing, we are expecting the output to be
> emitted when the current processing time crosses 12-02-00 with a reasonable
> delay(say 10 seconds). But, we are getting the result of the window after a
> long delay.
>
> In Spark logs it seems that the watermark is lagging.
> Here are the logs:
> 19/09/05 12:02:50 INFO GlobalWatermarkHolder: Put new watermark block:
> {0=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.558Z,
> highWatermark=2019-09-05T11:57:06.302Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z},
> 1=SparkWatermarks{lowWatermark=2019-09-05T11:57:05.120Z,
> highWatermark=2019-09-05T11:57:06.686Z,
> synchronizedProcessingTime=2019-09-05T11:55:00.500Z}}
> 19/09/05 12:02:50 INFO
> GlobalWatermarkHolder$WatermarkAdvancingStreamingListener: Batch with
> timestamp: 1567684500500 has completed, watermarks have been updated.
>
> As you can see, when the current processing time is 12:02:50, the
> highWatermark is 11:57:06.
> As the processing time progresses, the gap between processing time and
> highWatermark is increasing.
>
> We ran the same pipeline with same data in Flink Runner and Direct Runner
> and we have not seen this issue. In these runners, we can see that the
> Watermark is almost equal to Processing time.
>
> Sample Input Data :
>
> kafka_source1:
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-19 481704'}
> value:{'id': 1, 'name': 'test1', 'datetime': '2019-09-05 12-01-20 491764'}
> value:{'id': 1, 'name': 'test0', 'datetime': '2019-09-05 12-01-21 493494'}
>
> kafka_source2:
> value:{'country': 'India', 'name': 'test0', 'datetime': '2019-09-05
> 12-01-26 704060'}
> value:{'country': 'USA', 'name': 'test1', 'datetime': '2019-09-05 12-01-27
> 712300'}
> value:{'country': 'USA', 'name': 'test2', 'datetime': '2019-09-05 12-01-28
> 713951'}
>
> what can be the issue here?
>
> Regards,
> shanta
>