You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 07:41:13 UTC

[GitHub] [flink] Vancior opened a new pull request, #20391: [FLINK-28740][python][format] Support CsvBulkWriter

Vancior opened a new pull request, #20391:
URL: https://github.com/apache/flink/pull/20391

   
   
   ## What is the purpose of the change
   
   This PR supports using `CsvBulkWriter` in PyFlink. Although `CsvBulkWriter` is not directly documented, Java users still can use it in DataStream jobs, while for Python users, it's not possible without extra wrapping, cause there's no a explicit factory class.
   
   ## Brief change log
   
   - add utility function to create factory for `CsvBulkWriter`
   - add preprocessing to convert Row to RowData before sinking to CSV
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - `FileSinkCsvBulkWriterTests` in test_file_system.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs & Python Sphinx doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on pull request #20391: [FLINK-28740][python][format] Support CsvBulkWriter

Posted by GitBox <gi...@apache.org>.
Vancior commented on PR #20391:
URL: https://github.com/apache/flink/pull/20391#issuecomment-1203582725

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #20391: [FLINK-28740][python][format] Support CsvBulkWriter

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #20391: [FLINK-28740][python][format] Support CsvBulkWriter
URL: https://github.com/apache/flink/pull/20391


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20391: [FLINK-28740][python][format] Support CsvBulkWriter

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20391:
URL: https://github.com/apache/flink/pull/20391#discussion_r934209099


##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -599,8 +620,26 @@ def with_output_file_config(self, output_file_config: OutputFileConfig) \
                 output_file_config._j_output_file_config)
             return self
 
+        def _with_row_data_converter(self, row_type: 'RowType') -> 'FileSink.BulkFormatBuilder':

Review Comment:
   What about renaming it to something else as the given parameter isn't a converter?



##########
flink-python/pyflink/datastream/formats/csv.py:
##########
@@ -293,9 +304,62 @@ def for_schema(schema: 'CsvSchema') -> 'CsvReaderFormat':
         Builds a :class:`CsvReaderFormat` using `CsvSchema`.
         """
         jvm = get_gateway().jvm
-        j_csv_format = jvm.org.apache.flink.formats.csv.CsvReaderFormatFactory \
+        j_csv_format = jvm.org.apache.flink.formats.csv.PythonCsvUtils \
             .createCsvReaderFormat(
                 schema._j_schema,
-                _to_java_data_type(schema._data_type)
+                _to_java_data_type(schema._row_type)
             )
         return CsvReaderFormat(j_csv_format)
+
+
+class CsvBulkWriter(object):
+    """
+    CsvBulkWriter is for building :class:`BulkWriterFactory` to write Rows with a predefined CSV
+    schema to partitioned files in a bulk fashion.
+
+    Example:
+    ::
+
+        >>> schema = CsvSchema.builder() \\
+        ...     .add_number_column('id', number_type=DataTypes.INT()) \\
+        ...     .add_string_column('name') \\
+        ...     .add_array_column('list', ',', element_type=DataTypes.STRING()) \\
+        ...     .set_column_separator('|') \\
+        ...     .build()
+        >>> sink = FileSink.for_bulk_format(
+        ...     OUTPUT_DIR, CsvBulkWriter.for_schema(schema)).build()
+        >>> # If ds is a source stream, an identity map before sink is required
+        >>> ds.map(lambda e: e, output_type=schema.get_type_info()).sink_to(sink)
+
+    .. versionadded:: 1.16.0
+    """
+
+    class Factory(RowDataBulkWriterFactory):

Review Comment:
   Use RowDataBulkWriterFactory directly?



##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -161,6 +167,13 @@ def __init__(self, j_bulk_writer_factory):
         super().__init__(j_bulk_writer_factory)
 
 
+class RowDataBulkWriterFactory(BulkWriterFactory):
+

Review Comment:
   Add some Python doc for this class?



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -94,4 +100,30 @@ public byte[] serialize(Row row) {
             return wrappedSchema.serialize((T) row.getField(1));
         }
     }
+
+    /** A {@link ProcessFunction} that convert {@link Row} to {@link RowData}. */
+    public static class RowToRowDataProcessFunction extends ProcessFunction<Row, RowData> {
+
+        private static final long serialVersionUID = 1L;
+        private final DataType dataType;
+        private transient RowRowConverter converter;
+
+        public RowToRowDataProcessFunction(DataType dataType) {
+            this.dataType = dataType;
+        }
+
+        @Override
+        public void open(Configuration parameters) throws Exception {
+            super.open(parameters);
+            converter = RowRowConverter.create(dataType);
+            converter.open(RowToRowDataProcessFunction.class.getClassLoader());

Review Comment:
   ```suggestion
               converter.open(getRuntimeContext().getUserCodeClassLoader());
   ```



##########
flink-python/src/main/java/org/apache/flink/python/util/PythonConnectorUtils.java:
##########
@@ -94,4 +100,30 @@ public byte[] serialize(Row row) {
             return wrappedSchema.serialize((T) row.getField(1));
         }
     }
+
+    /** A {@link ProcessFunction} that convert {@link Row} to {@link RowData}. */
+    public static class RowToRowDataProcessFunction extends ProcessFunction<Row, RowData> {

Review Comment:
   ```suggestion
       public static class RowRowMapper extends RichMapFunction<Row, RowData> {
   ```



##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -526,8 +539,15 @@ class FileSink(Sink):
     the checkpoint from which we restore.
     """
 
-    def __init__(self, j_file_sink):
+    def __init__(self, j_file_sink, preprocessing=None):
         super(FileSink, self).__init__(sink=j_file_sink)
+        self._preprocessing = preprocessing
+
+    def need_preprocessing(self):
+        return self._preprocessing is not None
+
+    def get_preprocessing(self):

Review Comment:
   What about refactoring SupportPreprocessing as following:
   ```
   def apply(ds):
     pass
   ```
   
   Then we could simply the implementation and also avoid introducing TransformAppender.



##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -137,3 +137,20 @@ The corresponding CSV file:
 ```
 
 Similarly to the `TextLineInputFormat`, `CsvReaderFormat` can be used in both continues and batch modes (see [TextLineInputFormat]({{< ref "docs/connectors/datastream/formats/text_files" >}})  for examples).
+
+For PyFlink users, `CsvBulkWriter` is for creating `BulkWriterFactory` to write `Row` records to files in CSV format. Notice that if predecessor of sink is a source stream producing `RowData` records, e.g. CSV source, an identity map is required to make this work.

Review Comment:
   It's not quite clear for me why we need the identity map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20391: [FLINK-28740][python][format] Support CsvBulkWriter

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20391:
URL: https://github.com/apache/flink/pull/20391#issuecomment-1198987415

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "59c1f2cacfce32180c73baa371f8fb5e830f6370",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "59c1f2cacfce32180c73baa371f8fb5e830f6370",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 59c1f2cacfce32180c73baa371f8fb5e830f6370 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org