You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lydian <ly...@gmail.com> on 2023/02/17 01:00:56 UTC

Backup event from Kafka to S3 in parquet format every minute

I want to make a simple Beam pipeline which will store the events from
kafka to S3 in parquet format every minute.

Here's a simplified version of my pipeline:

def add_timestamp(event: Any) -> Any:
    from datetime import datetime
    from apache_beam import window

    return window.TimestampedValue(event,
datetime.timestamp(event[1].timestamp))
# Actual Pipeline
(
  pipeline
  | "Read from Kafka" >> ReadFromKafka(consumer_config, topics,
with_metadata=False)
  | "Transformed" >> beam.Map(my_transform)
  | "Add timestamp" >> beam.Map(add_timestamp)
  | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
  | "writing to parquet" >>
beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
)

However, the pipeline failed with

GroupByKey cannot be applied to an unbounded PCollection with global
windowing and a default trigger

This seems to be coming from
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
which
always add a GlobalWindows and thus causing this error. Wondering what I
should do to correctly backup the event from Kafka (Unbounded) to S3.
Thanks!

btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
(the latest version seems to have the same code) and Flink version is 1.14.5



Sincerely,
Lydian Lee

Re: Backup event from Kafka to S3 in parquet format every minute

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Alexey,

I am just learning Beam and doing POC that requires fetching stream data 
from PubSub and partitioning it on gs as parquet files with constant 
window. The thing is I have additional requirement to use ONLY SQL.

I did not manage to do it. My solutions either worked indefinitely or 
failed with `GroupByKey cannot be applied to an unbounded PCollection 
with global windowing and a default trigger` despite having window 
definition in the exact same CTE. Exactly what I tried You can find 
here: https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph

Here my knowledge of Beam ends. My hypothesis: is that `WriteToParquet` 
supports only bounded data and does not automatically partition data. So 
in OP could use it by batching the data in memory (into individual 
windows) and then applying `WriteToParquet` to each collected batch 
individually. But this is more like guess than knowledge. Please let me 
know if this is not correct.

On my solution I cannot test it as I am limited only to pure SQL, where 
I can only play with a table definition. But did not see any table 
parameters that could be responsible for partitioning. If there are 
please let me know.

If You remember that It is possible to read or write to partitioned 
Parquet files as just `PTransform` that's great! I probably must have 
made some minor mistake in my trials. But eager to learn what was the 
mistake.

Best regards

Wiśniowski Piotr



I did find solution like this one: 
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java. 
This probably could help OP since OP tried to save to lvl `PCollection` 
as `Parquet` instead of saving each partition separately like stated in 
`WriteOneFilePer

On 17.02.2023 18:38, Alexey Romanenko wrote:
> Piotr,
>
>> On 17 Feb 2023, at 09:48, Wiśniowski Piotr 
>> <co...@gmail.com> wrote:
>>
>> Does this mean that Parquet IO does not support partitioning, and we 
>> need to do some workarounds? Like explicitly mapping each window to a 
>> separate Parquet file?
>>
>
> Could you elaborate a bit more on this? IIRC, we used to read 
> partitioned Parquet files with ParquetIO while running TPC-DS benchmark.
>
> —
> Alexey

Re: Backup event from Kafka to S3 in parquet format every minute

Posted by Alexey Romanenko <ar...@gmail.com>.
Piotr,

> On 17 Feb 2023, at 09:48, Wiśniowski Piotr <co...@gmail.com> wrote:
> Does this mean that Parquet IO does not support partitioning, and we need to do some workarounds? Like explicitly mapping each window to a separate Parquet file?
> 

Could you elaborate a bit more on this? IIRC, we used to read partitioned Parquet files with ParquetIO while running TPC-DS benchmark.

—
Alexey

Re: Backup event from Kafka to S3 in parquet format every minute

Posted by Lydian <ly...@gmail.com>.
I've tested the same window, trigger, allowed_lateness nothing seems to
work.  I think the main issue is in the writerimpl which I linked earlier.

https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1033
According
to the doc:
> Currently only batch workflows support custom sinks.
Which makes me believe that it probably only supports write with batch
(bounded data source) Not sure if there's any plan to fix that, so I filed
a feature request: https://github.com/apache/beam/issues/25598  If anyone
who is also suffer on this issue, please also comment to get more attention
on this issue.


In the meantime, I also found an example that probably could be a
workaround for python pipelines:
https://www.programcreek.com/python/?code=GoogleCloudPlatform/python-docs-samples/python-docs-samples-master/pubsub/streaming-analytics/PubSubToGCS.py







Sincerely,
Lydian Lee



On Fri, Feb 17, 2023 at 2:12 AM Pavel Solomin <p....@gmail.com> wrote:

> For me this use-case worked with the following window definition, which
> was a bit of try-and-fail, and I can't claim I got 100% understanding of
> windowing logic.
>
> Here's my java code for Kinesis -> Parquet files which worked:
> https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56
>
> I hope it's not hard to derive beam-python window config from it.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi,
>>
>> Sounds like exact problem that I have few emails before -
>> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>>
>> Does this mean that Parquet IO does not support partitioning, and we need
>> to do some workarounds? Like explicitly mapping each window to a separate
>> Parquet file? This could be a solution in Your case, if it works (just idea
>> worth trying but did not test it and do not have enough experience with
>> Beam), but I am limited only to pure SQL and not sure how I can do it.
>>
>> Hope This helps with Your problem and Beam support could find some
>> solution to my case too.
>>
>> Best
>>
>> Wiśniowski Piotr
>> On 17.02.2023 02:00, Lydian wrote:
>>
>> I want to make a simple Beam pipeline which will store the events from
>> kafka to S3 in parquet format every minute.
>>
>> Here's a simplified version of my pipeline:
>>
>> def add_timestamp(event: Any) -> Any:
>>     from datetime import datetime
>>     from apache_beam import window
>>
>>     return window.TimestampedValue(event,  datetime.timestamp(event[1].timestamp))
>> # Actual Pipeline
>> (
>>   pipeline
>>   | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False)
>>   | "Transformed" >> beam.Map(my_transform)
>>   | "Add timestamp" >> beam.Map(add_timestamp)
>>   | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
>>   | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
>> )
>>
>> However, the pipeline failed with
>>
>> GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
>>
>> This seems to be coming from
>> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
>> always add a GlobalWindows and thus causing this error. Wondering what I
>> should do to correctly backup the event from Kafka (Unbounded) to S3.
>> Thanks!
>>
>> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
>> (the latest version seems to have the same code) and Flink version is 1.14.5
>>
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>

Re: Backup event from Kafka to S3 in parquet format every minute

Posted by Pavel Solomin <p....@gmail.com>.
For me this use-case worked with the following window definition, which was
a bit of try-and-fail, and I can't claim I got 100% understanding of
windowing logic.

Here's my java code for Kinesis -> Parquet files which worked:
https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56

I hope it's not hard to derive beam-python window config from it.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:

> Hi,
>
> Sounds like exact problem that I have few emails before -
> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>
> Does this mean that Parquet IO does not support partitioning, and we need
> to do some workarounds? Like explicitly mapping each window to a separate
> Parquet file? This could be a solution in Your case, if it works (just idea
> worth trying but did not test it and do not have enough experience with
> Beam), but I am limited only to pure SQL and not sure how I can do it.
>
> Hope This helps with Your problem and Beam support could find some
> solution to my case too.
>
> Best
>
> Wiśniowski Piotr
> On 17.02.2023 02:00, Lydian wrote:
>
> I want to make a simple Beam pipeline which will store the events from
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> def add_timestamp(event: Any) -> Any:
>     from datetime import datetime
>     from apache_beam import window
>
>     return window.TimestampedValue(event,  datetime.timestamp(event[1].timestamp))
> # Actual Pipeline
> (
>   pipeline
>   | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False)
>   | "Transformed" >> beam.Map(my_transform)
>   | "Add timestamp" >> beam.Map(add_timestamp)
>   | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
>   | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
> )
>
> However, the pipeline failed with
>
> GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
>
> This seems to be coming from
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
> always add a GlobalWindows and thus causing this error. Wondering what I
> should do to correctly backup the event from Kafka (Unbounded) to S3.
> Thanks!
>
> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
> (the latest version seems to have the same code) and Flink version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>
>

Re: Backup event from Kafka to S3 in parquet format every minute

Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi,

Sounds like exact problem that I have few emails before - 
https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph

Does this mean that Parquet IO does not support partitioning, and we 
need to do some workarounds? Like explicitly mapping each window to a 
separate Parquet file? This could be a solution in Your case, if it 
works (just idea worth trying but did not test it and do not have enough 
experience with Beam), but I am limited only to pure SQL and not sure 
how I can do it.

Hope This helps with Your problem and Beam support could find some 
solution to my case too.

Best

Wiśniowski Piotr

On 17.02.2023 02:00, Lydian wrote:
> I want to make a simple Beam pipeline which will store the events from 
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> |def add_timestamp(event: Any) -> Any: from datetime import datetime 
> from apache_beam import window return window.TimestampedValue(event, 
> datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline | 
> "Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
> with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add 
> timestamp" >> beam.Map(add_timestamp) | "window" >> 
> beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to 
> parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |
>
> However, the pipeline failed with
>
> |GroupByKey cannot be applied to an unbounded PCollection with global 
> windowing and a default trigger |
>
> This seems to be coming from 
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which 
> always add a |GlobalWindows| and thus causing this error. Wondering 
> what I should do to correctly backup the event from Kafka (Unbounded) 
> to S3. Thanks!
>
> btw, I am running with |portableRunner| with Flink. Beam Version is 
> 2.41.0 (the latest version seems to have the same code) and Flink 
> version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>