You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Haibo Sun <su...@163.com> on 2019/07/02 03:18:02 UTC

Re:File Naming Pattern from HadoopOutputFormat

Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


	public static class CustomAvroOutputFormat extends AvroOutputFormat {
		public CustomAvroOutputFormat(Path filePath, Class type) {
			super(filePath, type);
		}

		public CustomAvroOutputFormat(Class type) {
			super(type);
		}

		@Override
		public void open(int taskNumber, int numTasks) throws IOException {
			this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
			super.open(taskNumber, numTasks);
		}

		@Override
		protected String getDirectoryFileName(int taskNumber) {
			// returns a custom filename
			return null;
		}
	}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a class that implements the BucketAssigner interface and return a custom file name in the getBucketId() method (the value returned by getBucketId() will be treated as the file name).


ParquetStreamingFileSinkITCase:  https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com> wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-00001.snappy.parquet

tmp-r-00002.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this something which is natively supported? If so, could you please recommend how this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

RE: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

Posted by "Hailu, Andreas" <An...@gs.com>.
Very well - thank you both.

// ah

From: Haibo Sun <su...@163.com>
Sent: Wednesday, July 3, 2019 9:37 PM
To: Hailu, Andreas [Tech] <An...@ny.email.gs.com>
Cc: Yitzchak Lieberman <yi...@sentinelone.com>; user@flink.apache.org
Subject: Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

Hi, Andreas

I'm glad you have had a solution. If you're interested in option 2 I talked about, you can follow up on the progress of the issue (https://issues.apache.org/jira/browse/FLINK-12573<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D12573&d=DwQGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=PjxKTfWKbjbcxTz3PnGvrR-7MEYlxraUaI1tTTQFqEw&s=pYKp2r7d5fGK3otU5dfUAmTaZf2cjeuVUMCmjupz8Ik&e=>) that Yitzchak said by watching it.

Best,
Haibo

At 2019-07-03 21:11:44, "Hailu, Andreas" <An...@gs.com>> wrote:

Hi Haibo, Yitzchak, thanks for getting back to me.

The pattern I chose to use which worked was to extend the HadoopOutputFormat class, override the open() method, and modify the "mapreduce.output.basename" configuration property to match my desired file naming structure.

// ah

From: Haibo Sun <su...@163.com>>
Sent: Tuesday, July 2, 2019 5:57 AM
To: Yitzchak Lieberman <yi...@sentinelone.com>>
Cc: Hailu, Andreas [Tech] <An...@ny.email.gs.com>>; user@flink.apache.org<ma...@flink.apache.org>
Subject: Re:Re: File Naming Pattern from HadoopOutputFormat


Hi, Andreas

You are right. To meet this requirement, Flink should need to expose a interface to allow customizing the filename.

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman" <yi...@sentinelone.com>> wrote:
regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined the directory for the files in case of partitioning the data, for example:
<root dir>/day=20190101/part-1-1
there is an open issue for that: https://issues.apache.org/jira/browse/FLINK-12573<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D12573&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=zGL975-dtGwduiOtS--MtzcwNbM6ti3ziA85_ki-Ql8&e=>

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun <su...@163.com>> wrote:
Hi, Andreas

I think the following things may be what you want.

1. For writing Avro, I think you can extend AvroOutputFormat and override the  getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_api_java_org_apache_flink_formats_avro_AvroOutputFormat.html&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=tgLhwjHX0wHWgUMSSmNEOUmpatTF5N7JYfUEtYzQyf4&e=>


          public static class CustomAvroOutputFormat extends AvroOutputFormat {

                              public CustomAvroOutputFormat(Path filePath, Class type) {

                                                   super(filePath, type);

                              }



                              public CustomAvroOutputFormat(Class type) {

                                                   super(type);

                              }



                              @Override

                              public void open(int taskNumber, int numTasks) throws IOException {

                                                   this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);

                                                   super.open(taskNumber, numTasks);

                              }



                              @Override

                              protected String getDirectoryFileName(int taskNumber) {

                                                   // returns a custom filename

                                                   return null;

                              }

          }

2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a class that implements the BucketAssigner interface and return a custom file name in the getBucketId() method (the value returned by getBucketId() will be treated as the file name).

ParquetStreamingFileSinkITCase:  https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dformats_flink-2Dparquet_src_test_java_org_apache_flink_formats_parquet_avro_ParquetStreamingFileSinkITCase.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=8d-ErhDksiRSGYq3JQhnEERmXTnKMk99N7KfWs08Hho&e=>

StreamingFileSink#forBulkFormat: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_functions_sink_filesystem_StreamingFileSink.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=-d96QSEVUNN702t_ejhD2TmXhc4fi8534hoQDbUrzAs&e=>

DateTimeBucketAssigner: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_functions_sink_filesystem_bucketassigners_DateTimeBucketAssigner.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=u7c_I1rxZZYWSTUSW-JG8-6dk5MNLXMdvXXTr1Z0nAw&e=>


Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com>> wrote:
Hello Flink team,

I'm writing Avro and Parquet files to HDFS, and I've would like to include a UUID as a part of the file name.

Our files in HDFS currently follow this pattern:

tmp-r-00001.snappy.parquet
tmp-r-00002.snappy.parquet
...

I'm using a custom output format which extends a RichOutputFormat - is this something which is natively supported? If so, could you please recommend how this could be done, or share the relevant document?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re:RE: Re:Re: File Naming Pattern from HadoopOutputFormat

Posted by Haibo Sun <su...@163.com>.
Hi, Andreas  


I'm glad you have had a solution. If you're interested in option 2 I talked about, you can follow up on the progress of the issue (https://issues.apache.org/jira/browse/FLINK-12573) that Yitzchak said by watching it.


Best,
Haibo

At 2019-07-03 21:11:44, "Hailu, Andreas" <An...@gs.com> wrote:


Hi Haibo, Yitzchak, thanks for getting back to me.

 

The pattern I chose to use which worked was to extend the HadoopOutputFormat class, override the open() method, and modify the “mapreduce.output.basename” configuration property to match my desired file naming structure.

 

// ah

 

From: Haibo Sun <su...@163.com>
Sent: Tuesday, July 2, 2019 5:57 AM
To: Yitzchak Lieberman <yi...@sentinelone.com>
Cc: Hailu, Andreas [Tech] <An...@ny.email.gs.com>; user@flink.apache.org
Subject: Re:Re: File Naming Pattern from HadoopOutputFormat

 


Hi, Andreas 

 

You are right. To meet this requirement, Flink should need to expose a interface to allow customizing the filename.

 

Best,

Haibo


At 2019-07-02 16:33:44, "Yitzchak Lieberman" <yi...@sentinelone.com> wrote:



regarding option 2 for parquet:

implementing bucket assigner won't set the file name as getBucketId() defined the directory for the files in case of partitioning the data, for example:

<root dir>/day=20190101/part-1-1

there is an open issue for that: https://issues.apache.org/jira/browse/FLINK-12573

 

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun <su...@163.com> wrote:

Hi, Andreas

 

I think the following things may be what you want.

 

1. For writing Avro, I think you can extend AvroOutputFormat and override the  getDirectoryFileName() method to customize a file name, as shown below.

The javadoc of AvroOutputFormat: https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html

 

          public static class CustomAvroOutputFormat extends AvroOutputFormat {
                              public CustomAvroOutputFormat(Path filePath, Class type) {
                                                   super(filePath, type);
                              }
 
                              public CustomAvroOutputFormat(Class type) {
                                                   super(type);
                              }
 
                              @Override
                              public void open(int taskNumber, int numTasks) throws IOException {
                                                   this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
                                                   super.open(taskNumber, numTasks);
                              }
 
                              @Override
                              protected String getDirectoryFileName(int taskNumber) {
                                                   // returns a custom filename
                                                   return null;
                              }
          }

 

2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a class that implements the BucketAssigner interface and return a custom file name in the getBucketId() method (the value returned by getBucketId() will be treated as the file name).

 

ParquetStreamingFileSinkITCase:  https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java

 

StreamingFileSink#forBulkFormat: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java

 

DateTimeBucketAssigner: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java

 

 

Best,

Haibo


At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com> wrote:



Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-00001.snappy.parquet

tmp-r-00002.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this something which is natively supported? If so, could you please recommend how this could be done, or share the relevant document?

 

Best,

Andreas

 


Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

RE: Re:Re: File Naming Pattern from HadoopOutputFormat

Posted by "Hailu, Andreas" <An...@gs.com>.
Hi Haibo, Yitzchak, thanks for getting back to me.

The pattern I chose to use which worked was to extend the HadoopOutputFormat class, override the open() method, and modify the "mapreduce.output.basename" configuration property to match my desired file naming structure.

// ah

From: Haibo Sun <su...@163.com>
Sent: Tuesday, July 2, 2019 5:57 AM
To: Yitzchak Lieberman <yi...@sentinelone.com>
Cc: Hailu, Andreas [Tech] <An...@ny.email.gs.com>; user@flink.apache.org
Subject: Re:Re: File Naming Pattern from HadoopOutputFormat


Hi, Andreas

You are right. To meet this requirement, Flink should need to expose a interface to allow customizing the filename.

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman" <yi...@sentinelone.com>> wrote:

regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined the directory for the files in case of partitioning the data, for example:
<root dir>/day=20190101/part-1-1
there is an open issue for that: https://issues.apache.org/jira/browse/FLINK-12573<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D12573&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=zGL975-dtGwduiOtS--MtzcwNbM6ti3ziA85_ki-Ql8&e=>

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun <su...@163.com>> wrote:
Hi, Andreas

I think the following things may be what you want.

1. For writing Avro, I think you can extend AvroOutputFormat and override the  getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.8_api_java_org_apache_flink_formats_avro_AvroOutputFormat.html&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=tgLhwjHX0wHWgUMSSmNEOUmpatTF5N7JYfUEtYzQyf4&e=>


          public static class CustomAvroOutputFormat extends AvroOutputFormat {

                              public CustomAvroOutputFormat(Path filePath, Class type) {

                                                   super(filePath, type);

                              }



                              public CustomAvroOutputFormat(Class type) {

                                                   super(type);

                              }



                              @Override

                              public void open(int taskNumber, int numTasks) throws IOException {

                                                   this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);

                                                   super.open(taskNumber, numTasks);

                              }



                              @Override

                              protected String getDirectoryFileName(int taskNumber) {

                                                   // returns a custom filename

                                                   return null;

                              }

          }

2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a class that implements the BucketAssigner interface and return a custom file name in the getBucketId() method (the value returned by getBucketId() will be treated as the file name).

ParquetStreamingFileSinkITCase:  https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dformats_flink-2Dparquet_src_test_java_org_apache_flink_formats_parquet_avro_ParquetStreamingFileSinkITCase.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=8d-ErhDksiRSGYq3JQhnEERmXTnKMk99N7KfWs08Hho&e=>

StreamingFileSink#forBulkFormat: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_functions_sink_filesystem_StreamingFileSink.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=-d96QSEVUNN702t_ejhD2TmXhc4fi8534hoQDbUrzAs&e=>

DateTimeBucketAssigner: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java<https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_apache_flink_blob_master_flink-2Dstreaming-2Djava_src_main_java_org_apache_flink_streaming_api_functions_sink_filesystem_bucketassigners_DateTimeBucketAssigner.java&d=DwMGbg&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=KTuJ_S9VUApSpfhp2WtgGeinhaMG-qZuTb59kFHm3Z8&s=u7c_I1rxZZYWSTUSW-JG8-6dk5MNLXMdvXXTr1Z0nAw&e=>


Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com>> wrote:

Hello Flink team,

I'm writing Avro and Parquet files to HDFS, and I've would like to include a UUID as a part of the file name.

Our files in HDFS currently follow this pattern:

tmp-r-00001.snappy.parquet
tmp-r-00002.snappy.parquet
...

I'm using a custom output format which extends a RichOutputFormat - is this something which is natively supported? If so, could you please recommend how this could be done, or share the relevant document?

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Re:Re: File Naming Pattern from HadoopOutputFormat

Posted by Haibo Sun <su...@163.com>.
Hi, Andreas 


You are right. To meet this requirement, Flink should need to expose a interface to allow customizing the filename.
 

Best,
Haibo

At 2019-07-02 16:33:44, "Yitzchak Lieberman" <yi...@sentinelone.com> wrote:

regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId() defined the directory for the files in case of partitioning the data, for example:
<root dir>/day=20190101/part-1-1
there is an open issue for that: https://issues.apache.org/jira/browse/FLINK-12573


On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun <su...@163.com> wrote:

Hi, Andreas


I think the following things may be what you want.


1. For writing Avro, I think you can extend AvroOutputFormat and override the  getDirectoryFileName() method to customize a file name, as shown below.
The javadoc of AvroOutputFormat: https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html


	public static class CustomAvroOutputFormat extends AvroOutputFormat {
		public CustomAvroOutputFormat(Path filePath, Class type) {
			super(filePath, type);
		}

		public CustomAvroOutputFormat(Class type) {
			super(type);
		}

		@Override
		public void open(int taskNumber, int numTasks) throws IOException {
			this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
			super.open(taskNumber, numTasks);
		}

		@Override
		protected String getDirectoryFileName(int taskNumber) {
			// returns a custom filename
			return null;
		}
	}


2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase, StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create a class that implements the BucketAssigner interface and return a custom file name in the getBucketId() method (the value returned by getBucketId() will be treated as the file name).


ParquetStreamingFileSinkITCase:  https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java


StreamingFileSink#forBulkFormat: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java


DateTimeBucketAssigner: https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java




Best,
Haibo

At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com> wrote:


Hello Flink team,

 

I’m writing Avro and Parquet files to HDFS, and I’ve would like to include a UUID as a part of the file name.

 

Our files in HDFS currently follow this pattern:

 

tmp-r-00001.snappy.parquet

tmp-r-00002.snappy.parquet

...

 

I’m using a custom output format which extends a RichOutputFormat - is this something which is natively supported? If so, could you please recommend how this could be done, or share the relevant document?

 

Best,

Andreas




Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices

Re: File Naming Pattern from HadoopOutputFormat

Posted by Yitzchak Lieberman <yi...@sentinelone.com>.
regarding option 2 for parquet:
implementing bucket assigner won't set the file name as getBucketId()
defined the directory for the files in case of partitioning the data,
for example:
<root dir>/day=20190101/part-1-1
there is an open issue for that:
https://issues.apache.org/jira/browse/FLINK-12573

On Tue, Jul 2, 2019 at 6:18 AM Haibo Sun <su...@163.com> wrote:

> Hi, Andreas
>
> I think the following things may be what you want.
>
> 1. For writing Avro, I think you can extend AvroOutputFormat and override
> the  getDirectoryFileName() method to customize a file name, as shown below.
> The javadoc of AvroOutputFormat:
> https://ci.apache.org/projects/flink/flink-docs-release-1.8/api/java/org/apache/flink/formats/avro/AvroOutputFormat.html
>
> 	public static class CustomAvroOutputFormat extends AvroOutputFormat {
> 		public CustomAvroOutputFormat(Path filePath, Class type) {
> 			super(filePath, type);
> 		}
>
> 		public CustomAvroOutputFormat(Class type) {
> 			super(type);
> 		}
>
> 		@Override
> 		public void open(int taskNumber, int numTasks) throws IOException {
> 			this.setOutputDirectoryMode(OutputDirectoryMode.ALWAYS);
> 			super.open(taskNumber, numTasks);
> 		}
>
> 		@Override
> 		protected String getDirectoryFileName(int taskNumber) {
> 			// returns a custom filename
> 			return null;
> 		}
> 	}
>
>
> 2. For writing Parquet, you can refer to ParquetStreamingFileSinkITCase,
> StreamingFileSink#forBulkFormat and DateTimeBucketAssigner. You can create
> a class that implements the BucketAssigner interface and return a custom
> file name in the getBucketId() method (the value returned by getBucketId()
> will be treated as the file name).
>
> ParquetStreamingFileSinkITCase:
> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>
> StreamingFileSink#forBulkFormat:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/StreamingFileSink.java
>
> DateTimeBucketAssigner:
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/bucketassigners/DateTimeBucketAssigner.java
>
>
> Best,
> Haibo
>
> At 2019-07-02 04:15:07, "Hailu, Andreas" <An...@gs.com> wrote:
>
> Hello Flink team,
>
>
>
> I’m writing Avro and Parquet files to HDFS, and I’ve would like to include
> a UUID as a part of the file name.
>
>
>
> Our files in HDFS currently follow this pattern:
>
>
>
> *tmp-r-00001.snappy.parquet*
>
> *tmp-r-00002.snappy.parquet*
>
> *...*
>
>
>
> I’m using a custom output format which extends a RichOutputFormat - is
> this something which is natively supported? If so, could you please
> recommend how this could be done, or share the relevant document?
>
>
>
> Best,
>
> Andreas
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>