You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Tao Li <ta...@zillow.com> on 2021/03/04 01:35:59 UTC

Does writeDynamic() support writing different element groups to different output paths?

Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with this hour. With Flink (for example), we can leverage “Bucketing File Sink”: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html

However I am not seeing Beam FileIO’s writeDynamic API supports specifying different output paths for different groups: https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam writeDynamic? Please advise. Thanks!



Re: Does writeDynamic() support writing different element groups to different output paths?

Posted by Tao Li <ta...@zillow.com>.
I was able to resolve “unable to deserialize FileBasedSink” error by adding withNumShards().

inputData.apply(FileIO.<String, GenericRecord>writeDynamic()
                .by(record -> "test")
                .withDestinationCoder(StringUtf8Coder.of())
                .via(ParquetIO.sink(inputAvroSchema))
                .to(outputPath)
                .withNumShards(10)
                .withNaming(new SimpleFunction<String, FileIO.Write.FileNaming>() {
                    @Override
                    public FileIO.Write.FileNaming apply(String input) {
                        return  FileIO.Write.relativeFileNaming(
                                ValueProvider.StaticValueProvider.of(outputPath + "/" + input), naming);
                    }
                }));

Now I am seeing a new error as below. Is this related to https://issues.apache.org/jira/browse/BEAM-9868? I don’t quite understand what this error means. Please advise.

Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
                at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.doFnWithExecutionInformationFromProto(ParDoTranslation.java:709)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:392)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.ParDoTranslation.getSchemaInformation(ParDoTranslation.java:377)
                at org.apache.beam.runners.direct.ParDoEvaluatorFactory.forApplication(ParDoEvaluatorFactory.java:87)

From: Tao Li <ta...@zillow.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Thursday, March 4, 2021 at 11:52 AM
To: "user@beam.apache.org" <us...@beam.apache.org>, Kobe Feng <fl...@gmail.com>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

I tried below code:

inputData.apply(FileIO.<String, GenericRecord>writeDynamic()
                .by(record -> "test")
                .via(ParquetIO.sink(inputAvroSchema))
                .to(outputPath)
                .withNaming(new SimpleFunction<String, FileIO.Write.FileNaming>() {
                    @Override
                    public FileIO.Write.FileNaming apply(String input) {
                        return  FileIO.Write.relativeFileNaming(
                                ValueProvider.StaticValueProvider.of(outputPath + "/" + input), naming);
                    }
                })
                .withDestinationCoder(StringUtf8Coder.of()));

Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize FileBasedSink
                at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.sinkFromProto(WriteFilesTranslation.java:125)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.getSink(WriteFilesTranslation.java:137)
                at org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:69)
                at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:564)
                at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:299)

When I switch to use write() API as below, it works fine. Does anyone have any ideas? Thanks!

inputData.apply(FileIO.<GenericRecord>write()
                .withNumShards(10)
                .via(ParquetIO.sink(inputAvroSchema))
                .to(outputPath)
                .withSuffix(".parquet"));


From: Tao Li <ta...@zillow.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Thursday, March 4, 2021 at 9:36 AM
To: "user@beam.apache.org" <us...@beam.apache.org>, Kobe Feng <fl...@gmail.com>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

Thanks Kobe let me give it a try!

From: Kobe Feng <fl...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

I used the following way long time ago for writing into partitions in hdfs (maybe better solutions from others), and not sure any interface change which you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with this hour. With Flink (for example), we can leverage “Bucketing File Sink”: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fconnectors%2Ffilesystem_sink.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030924936%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdztxrPHWE%2B94FslOWpJQpovdB8XJJk7sNYcY6KPP3U%3D&reserved=0>

However I am not seeing Beam FileIO’s writeDynamic API supports specifying different output paths for different groups: https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.28.0%2Findex.html%3Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileIO.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030934892%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lZuUQJAvuSxgCUNP%2BckbHQLqNq8u%2FcGMAXFSA2KOqW0%3D&reserved=0>

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng

Re: Does writeDynamic() support writing different element groups to different output paths?

Posted by Tao Li <ta...@zillow.com>.
I tried below code:

inputData.apply(FileIO.<String, GenericRecord>writeDynamic()
                .by(record -> "test")
                .via(ParquetIO.sink(inputAvroSchema))
                .to(outputPath)
                .withNaming(new SimpleFunction<String, FileIO.Write.FileNaming>() {
                    @Override
                    public FileIO.Write.FileNaming apply(String input) {
                        return  FileIO.Write.relativeFileNaming(
                                ValueProvider.StaticValueProvider.of(outputPath + "/" + input), naming);
                    }
                })
                .withDestinationCoder(StringUtf8Coder.of()));

Exception in thread "main" java.lang.IllegalArgumentException: unable to deserialize FileBasedSink
                at org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:78)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.sinkFromProto(WriteFilesTranslation.java:125)
                at org.apache.beam.repackaged.direct_java.runners.core.construction.WriteFilesTranslation.getSink(WriteFilesTranslation.java:137)
                at org.apache.beam.runners.direct.WriteWithShardingFactory.getReplacementTransform(WriteWithShardingFactory.java:69)
                at org.apache.beam.sdk.Pipeline.applyReplacement(Pipeline.java:564)
                at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:299)

When I switch to use write() API as below, it works fine. Does anyone have any ideas? Thanks!

inputData.apply(FileIO.<GenericRecord>write()
                .withNumShards(10)
                .via(ParquetIO.sink(inputAvroSchema))
                .to(outputPath)
                .withSuffix(".parquet"));


From: Tao Li <ta...@zillow.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Thursday, March 4, 2021 at 9:36 AM
To: "user@beam.apache.org" <us...@beam.apache.org>, Kobe Feng <fl...@gmail.com>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

Thanks Kobe let me give it a try!

From: Kobe Feng <fl...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

I used the following way long time ago for writing into partitions in hdfs (maybe better solutions from others), and not sure any interface change which you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with this hour. With Flink (for example), we can leverage “Bucketing File Sink”: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fconnectors%2Ffilesystem_sink.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030924936%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdztxrPHWE%2B94FslOWpJQpovdB8XJJk7sNYcY6KPP3U%3D&reserved=0>

However I am not seeing Beam FileIO’s writeDynamic API supports specifying different output paths for different groups: https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.28.0%2Findex.html%3Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileIO.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030934892%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lZuUQJAvuSxgCUNP%2BckbHQLqNq8u%2FcGMAXFSA2KOqW0%3D&reserved=0>

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng

Re: Does writeDynamic() support writing different element groups to different output paths?

Posted by Tao Li <ta...@zillow.com>.
Thanks Kobe let me give it a try!

From: Kobe Feng <fl...@gmail.com>
Reply-To: "user@beam.apache.org" <us...@beam.apache.org>
Date: Wednesday, March 3, 2021 at 9:33 PM
To: "user@beam.apache.org" <us...@beam.apache.org>
Cc: Yuchu Cao <yu...@trulia.com>
Subject: Re: Does writeDynamic() support writing different element groups to different output paths?

I used the following way long time ago for writing into partitions in hdfs (maybe better solutions from others), and not sure any interface change which you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix, FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) => relativeFileNaming(StaticValueProvider.of[String](baseDir + Path.SEPARATOR + partitionFolder), fileNaming))
   ...

val partitionFunc: T => String



the good practice is auto-switch: using event time field from record value for partitioning when event time window, or process time.

and partitionFunc could consider multi partition columns to get subdirectories base on ur file system path separator, e.g. S3.

On Wed, Mar 3, 2021 at 5:36 PM Tao Li <ta...@zillow.com>> wrote:
Hi Beam community,

I have a streaming app that writes every hour’s data to a folder named with this hour. With Flink (for example), we can leverage “Bucketing File Sink”: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-release-1.11%2Fdev%2Fconnectors%2Ffilesystem_sink.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030924936%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=UdztxrPHWE%2B94FslOWpJQpovdB8XJJk7sNYcY6KPP3U%3D&reserved=0>

However I am not seeing Beam FileIO’s writeDynamic API supports specifying different output paths for different groups: https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html<https://nam11.safelinks.protection.outlook.com/?url=https%3A%2F%2Fbeam.apache.org%2Freleases%2Fjavadoc%2F2.28.0%2Findex.html%3Forg%2Fapache%2Fbeam%2Fsdk%2Fio%2FFileIO.html&data=04%7C01%7Ctaol%40zillow.com%7C4e1d0f59c7684f3c2de908d8decf0583%7C033464830d1840e7a5883784ac50e16f%7C0%7C0%7C637504328030934892%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=lZuUQJAvuSxgCUNP%2BckbHQLqNq8u%2FcGMAXFSA2KOqW0%3D&reserved=0>

Seems like writeDynamic() only supports specifying different naming strategy.

How can I specify different hourly based output paths for hourly data with Beam writeDynamic? Please advise. Thanks!




--
Yours Sincerely
Kobe Feng

Re: Does writeDynamic() support writing different element groups to different output paths?

Posted by Kobe Feng <fl...@gmail.com>.
I used the following way long time ago for writing into partitions in hdfs
(maybe better solutions from others), and not sure any interface change
which you need to check:

val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix,
FileIO.writeDynamic[String, GenericRecord]()
  .by(recordPartition.partitionFunc)
  .withDestinationCoder(StringUtf8Coder.of())
  .via(DarwinParquetIO.sink(...)
  .to(baseDir)
   ...
  .withNaming((partitionFolder: String) =>
relativeFileNaming(StaticValueProvider.of[String](baseDir +
Path.SEPARATOR + partitionFolder), fileNaming))
   ...

val partitionFunc: T => Stringthe good practice is auto-switch: using
event time field from record value for partitioning when event time
window, or process time.
and partitionFunc could consider multi partition columns to get
subdirectories base on ur file system path separator, e.g. S3.


On Wed, Mar 3, 2021 at 5:36 PM Tao Li <ta...@zillow.com> wrote:

> Hi Beam community,
>
>
>
> I have a streaming app that writes every hour’s data to a folder named
> with this hour. With Flink (for example), we can leverage “Bucketing File
> Sink”:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html
>
>
>
> However I am not seeing Beam FileIO’s writeDynamic API supports specifying
> different output paths for different groups:
> https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html
>
>
>
> Seems like writeDynamic() only supports specifying different naming
> strategy.
>
>
>
> How can I specify different hourly based output paths for hourly data with
> Beam writeDynamic? Please advise. Thanks!
>
>
>
>
>


-- 
Yours Sincerely
Kobe Feng