You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Lydian <ly...@gmail.com> on 2023/02/17 01:00:56 UTC
Backup event from Kafka to S3 in parquet format every minute
I want to make a simple Beam pipeline which will store the events from
kafka to S3 in parquet format every minute.
Here's a simplified version of my pipeline:
def add_timestamp(event: Any) -> Any:
from datetime import datetime
from apache_beam import window
return window.TimestampedValue(event,
datetime.timestamp(event[1].timestamp))
# Actual Pipeline
(
pipeline
| "Read from Kafka" >> ReadFromKafka(consumer_config, topics,
with_metadata=False)
| "Transformed" >> beam.Map(my_transform)
| "Add timestamp" >> beam.Map(add_timestamp)
| "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins
| "writing to parquet" >>
beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
)
However, the pipeline failed with
GroupByKey cannot be applied to an unbounded PCollection with global
windowing and a default trigger
This seems to be coming from
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
which
always add a GlobalWindows and thus causing this error. Wondering what I
should do to correctly backup the event from Kafka (Unbounded) to S3.
Thanks!
btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
(the latest version seems to have the same code) and Flink version is 1.14.5
Sincerely,
Lydian Lee
Re: Backup event from Kafka to S3 in parquet format every minute
Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi Alexey,
I am just learning Beam and doing POC that requires fetching stream data
from PubSub and partitioning it on gs as parquet files with constant
window. The thing is I have additional requirement to use ONLY SQL.
I did not manage to do it. My solutions either worked indefinitely or
failed with `GroupByKey cannot be applied to an unbounded PCollection
with global windowing and a default trigger` despite having window
definition in the exact same CTE. Exactly what I tried You can find
here: https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
Here my knowledge of Beam ends. My hypothesis: is that `WriteToParquet`
supports only bounded data and does not automatically partition data. So
in OP could use it by batching the data in memory (into individual
windows) and then applying `WriteToParquet` to each collected batch
individually. But this is more like guess than knowledge. Please let me
know if this is not correct.
On my solution I cannot test it as I am limited only to pure SQL, where
I can only play with a table definition. But did not see any table
parameters that could be responsible for partitioning. If there are
please let me know.
If You remember that It is possible to read or write to partitioned
Parquet files as just `PTransform` that's great! I probably must have
made some minor mistake in my trials. But eager to learn what was the
mistake.
Best regards
Wiśniowski Piotr
I did find solution like this one:
https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java.
This probably could help OP since OP tried to save to lvl `PCollection`
as `Parquet` instead of saving each partition separately like stated in
`WriteOneFilePer
On 17.02.2023 18:38, Alexey Romanenko wrote:
> Piotr,
>
>> On 17 Feb 2023, at 09:48, Wiśniowski Piotr
>> <co...@gmail.com> wrote:
>>
>> Does this mean that Parquet IO does not support partitioning, and we
>> need to do some workarounds? Like explicitly mapping each window to a
>> separate Parquet file?
>>
>
> Could you elaborate a bit more on this? IIRC, we used to read
> partitioned Parquet files with ParquetIO while running TPC-DS benchmark.
>
> —
> Alexey
Re: Backup event from Kafka to S3 in parquet format every minute
Posted by Alexey Romanenko <ar...@gmail.com>.
Piotr,
> On 17 Feb 2023, at 09:48, Wiśniowski Piotr <co...@gmail.com> wrote:
> Does this mean that Parquet IO does not support partitioning, and we need to do some workarounds? Like explicitly mapping each window to a separate Parquet file?
>
Could you elaborate a bit more on this? IIRC, we used to read partitioned Parquet files with ParquetIO while running TPC-DS benchmark.
—
Alexey
Re: Backup event from Kafka to S3 in parquet format every minute
Posted by Lydian <ly...@gmail.com>.
I've tested the same window, trigger, allowed_lateness nothing seems to
work. I think the main issue is in the writerimpl which I linked earlier.
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1033
According
to the doc:
> Currently only batch workflows support custom sinks.
Which makes me believe that it probably only supports write with batch
(bounded data source) Not sure if there's any plan to fix that, so I filed
a feature request: https://github.com/apache/beam/issues/25598 If anyone
who is also suffer on this issue, please also comment to get more attention
on this issue.
In the meantime, I also found an example that probably could be a
workaround for python pipelines:
https://www.programcreek.com/python/?code=GoogleCloudPlatform/python-docs-samples/python-docs-samples-master/pubsub/streaming-analytics/PubSubToGCS.py
Sincerely,
Lydian Lee
On Fri, Feb 17, 2023 at 2:12 AM Pavel Solomin <p....@gmail.com> wrote:
> For me this use-case worked with the following window definition, which
> was a bit of try-and-fail, and I can't claim I got 100% understanding of
> windowing logic.
>
> Here's my java code for Kinesis -> Parquet files which worked:
> https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56
>
> I hope it's not hard to derive beam-python window config from it.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
> contact.wisniowskipiotr@gmail.com> wrote:
>
>> Hi,
>>
>> Sounds like exact problem that I have few emails before -
>> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>>
>> Does this mean that Parquet IO does not support partitioning, and we need
>> to do some workarounds? Like explicitly mapping each window to a separate
>> Parquet file? This could be a solution in Your case, if it works (just idea
>> worth trying but did not test it and do not have enough experience with
>> Beam), but I am limited only to pure SQL and not sure how I can do it.
>>
>> Hope This helps with Your problem and Beam support could find some
>> solution to my case too.
>>
>> Best
>>
>> Wiśniowski Piotr
>> On 17.02.2023 02:00, Lydian wrote:
>>
>> I want to make a simple Beam pipeline which will store the events from
>> kafka to S3 in parquet format every minute.
>>
>> Here's a simplified version of my pipeline:
>>
>> def add_timestamp(event: Any) -> Any:
>> from datetime import datetime
>> from apache_beam import window
>>
>> return window.TimestampedValue(event, datetime.timestamp(event[1].timestamp))
>> # Actual Pipeline
>> (
>> pipeline
>> | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False)
>> | "Transformed" >> beam.Map(my_transform)
>> | "Add timestamp" >> beam.Map(add_timestamp)
>> | "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins
>> | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
>> )
>>
>> However, the pipeline failed with
>>
>> GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
>>
>> This seems to be coming from
>> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
>> always add a GlobalWindows and thus causing this error. Wondering what I
>> should do to correctly backup the event from Kafka (Unbounded) to S3.
>> Thanks!
>>
>> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
>> (the latest version seems to have the same code) and Flink version is 1.14.5
>>
>>
>>
>> Sincerely,
>> Lydian Lee
>>
>>
Re: Backup event from Kafka to S3 in parquet format every minute
Posted by Pavel Solomin <p....@gmail.com>.
For me this use-case worked with the following window definition, which was
a bit of try-and-fail, and I can't claim I got 100% understanding of
windowing logic.
Here's my java code for Kinesis -> Parquet files which worked:
https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56
I hope it's not hard to derive beam-python window config from it.
Best Regards,
Pavel Solomin
Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>
On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
contact.wisniowskipiotr@gmail.com> wrote:
> Hi,
>
> Sounds like exact problem that I have few emails before -
> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>
> Does this mean that Parquet IO does not support partitioning, and we need
> to do some workarounds? Like explicitly mapping each window to a separate
> Parquet file? This could be a solution in Your case, if it works (just idea
> worth trying but did not test it and do not have enough experience with
> Beam), but I am limited only to pure SQL and not sure how I can do it.
>
> Hope This helps with Your problem and Beam support could find some
> solution to my case too.
>
> Best
>
> Wiśniowski Piotr
> On 17.02.2023 02:00, Lydian wrote:
>
> I want to make a simple Beam pipeline which will store the events from
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> def add_timestamp(event: Any) -> Any:
> from datetime import datetime
> from apache_beam import window
>
> return window.TimestampedValue(event, datetime.timestamp(event[1].timestamp))
> # Actual Pipeline
> (
> pipeline
> | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, with_metadata=False)
> | "Transformed" >> beam.Map(my_transform)
> | "Add timestamp" >> beam.Map(add_timestamp)
> | "window" >> beam.WindowInto(window.FixedWindows(60)) # 1 mins
> | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema)
> )
>
> However, the pipeline failed with
>
> GroupByKey cannot be applied to an unbounded PCollection with global windowing and a default trigger
>
> This seems to be coming from
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
> always add a GlobalWindows and thus causing this error. Wondering what I
> should do to correctly backup the event from Kafka (Unbounded) to S3.
> Thanks!
>
> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
> (the latest version seems to have the same code) and Flink version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>
>
Re: Backup event from Kafka to S3 in parquet format every minute
Posted by Wiśniowski Piotr <co...@gmail.com>.
Hi,
Sounds like exact problem that I have few emails before -
https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
Does this mean that Parquet IO does not support partitioning, and we
need to do some workarounds? Like explicitly mapping each window to a
separate Parquet file? This could be a solution in Your case, if it
works (just idea worth trying but did not test it and do not have enough
experience with Beam), but I am limited only to pure SQL and not sure
how I can do it.
Hope This helps with Your problem and Beam support could find some
solution to my case too.
Best
Wiśniowski Piotr
On 17.02.2023 02:00, Lydian wrote:
> I want to make a simple Beam pipeline which will store the events from
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> |def add_timestamp(event: Any) -> Any: from datetime import datetime
> from apache_beam import window return window.TimestampedValue(event,
> datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline |
> "Read from Kafka" >> ReadFromKafka(consumer_config, topics,
> with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add
> timestamp" >> beam.Map(add_timestamp) | "window" >>
> beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to
> parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |
>
> However, the pipeline failed with
>
> |GroupByKey cannot be applied to an unbounded PCollection with global
> windowing and a default trigger |
>
> This seems to be coming from
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which
> always add a |GlobalWindows| and thus causing this error. Wondering
> what I should do to correctly backup the event from Kafka (Unbounded)
> to S3. Thanks!
>
> btw, I am running with |portableRunner| with Flink. Beam Version is
> 2.41.0 (the latest version seems to have the same code) and Flink
> version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>