You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/27 16:28:53 UTC

[GitHub] [flink] Vancior opened a new pull request, #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Vancior opened a new pull request, #20383:
URL: https://github.com/apache/flink/pull/20383

   
   
   ## What is the purpose of the change
   
   This PR supports using `AvroWriters.for_generic_record` to write vanilla Python objects to files via FileSink.
   
   
   ## Brief change log
   
   - add Avro encoder on Python side
   - port Java AvroWriters.forGenericRecord to Python
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
   - `FileSinkAvroWritersTests` in test_file_system.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs & Python Sphinx doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934082286


##########
flink-python/pyflink/datastream/formats/csv.py:
##########
@@ -131,7 +131,7 @@ def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder':
         :param schema: Another :class:`CsvSchema`.
         """
         self._j_schema_builder.addColumnsFrom(schema._j_schema)
-        for field in cast(schema._data_type, RowType):

Review Comment:
   I could add this in the CSV PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934093658


##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -535,6 +568,51 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
         return FileSink.RowFormatBuilder(
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
+    class BulkFormatBuilder(object):
+        """
+        Builder for the vanilla FileSink using a bulk format.
+        """
+
+        def __init__(self, j_bulk_format_builder):

Review Comment:
   Maybe we can separate it to another PR? I'm just aligning to the builder in StreamingFileSink here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters
URL: https://github.com/apache/flink/pull/20383


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934112786


##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -535,6 +568,51 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
         return FileSink.RowFormatBuilder(
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
+    class BulkFormatBuilder(object):
+        """
+        Builder for the vanilla FileSink using a bulk format.
+        """
+
+        def __init__(self, j_bulk_format_builder):

Review Comment:
   StreamingFileSink is to be removed, see https://issues.apache.org/jira/browse/FLINK-28641 for more details. Will we align these methods in 1.16?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934082090


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java:
##########
@@ -51,15 +51,7 @@ public class AvroParquetReaders {
     public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
             final Class<T> typeClass) {
         return new AvroParquetRecordFormat<>(
-                new AvroTypeInfo<>(typeClass),

Review Comment:
   Yeah I find that the change is only needed for the GenericRecord we provide in Python.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934082090


##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java:
##########
@@ -51,15 +51,7 @@ public class AvroParquetReaders {
     public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
             final Class<T> typeClass) {
         return new AvroParquetRecordFormat<>(
-                new AvroTypeInfo<>(typeClass),

Review Comment:
   Yeah I find that the change is only needed for the GenericRecord we provided in Python.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
dianfu commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r933755193


##########
flink-python/pyflink/datastream/formats/csv.py:
##########
@@ -131,7 +131,7 @@ def add_columns_from(self, schema: 'CsvSchema') -> 'CsvSchemaBuilder':
         :param schema: Another :class:`CsvSchema`.
         """
         self._j_schema_builder.addColumnsFrom(schema._j_schema)
-        for field in cast(schema._data_type, RowType):

Review Comment:
   Could we update the test cases to cover this method?



##########
flink-python/pyflink/datastream/connectors/file_system.py:
##########
@@ -535,6 +568,51 @@ def for_row_format(base_path: str, encoder: Encoder) -> 'FileSink.RowFormatBuild
         return FileSink.RowFormatBuilder(
             JFileSink.forRowFormat(JPath(base_path), encoder._j_encoder))
 
+    class BulkFormatBuilder(object):
+        """
+        Builder for the vanilla FileSink using a bulk format.
+        """
+
+        def __init__(self, j_bulk_format_builder):

Review Comment:
   Several methods in the Java BulkFormatBuilder are missing here, e.g. enableCompact, disableCompact, etc. Is there any reason why not aligning them here?



##########
flink-python/pyflink/datastream/formats/avro.py:
##########
@@ -63,6 +70,9 @@ class GenericRecordAvroTypeInfo(TypeInformation):
     A :class:`TypeInformation` of Avro's GenericRecord, including the schema. This is a wrapper of
     Java org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.

Review Comment:
   This class is also a public class which will be used by users and so add it to the __init__.py to expose it?



##########
flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/avro/AvroParquetReaders.java:
##########
@@ -51,15 +51,7 @@ public class AvroParquetReaders {
     public static <T extends SpecificRecordBase> StreamFormat<T> forSpecificRecord(
             final Class<T> typeClass) {
         return new AvroParquetRecordFormat<>(
-                new AvroTypeInfo<>(typeClass),

Review Comment:
   It seems that you are reverting the changes made in #19956. Are there any specify reasons?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20383:
URL: https://github.com/apache/flink/pull/20383#issuecomment-1196989858

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3586fde44ca955f158189762ae60245f97961a72",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3586fde44ca955f158189762ae60245f97961a72",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3586fde44ca955f158189762ae60245f97961a72 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20383: [FLINK-28664][python][format] FileSink supports AvroWriters

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20383:
URL: https://github.com/apache/flink/pull/20383#discussion_r934082463


##########
flink-python/pyflink/datastream/formats/avro.py:
##########
@@ -63,6 +70,9 @@ class GenericRecordAvroTypeInfo(TypeInformation):
     A :class:`TypeInformation` of Avro's GenericRecord, including the schema. This is a wrapper of
     Java org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.

Review Comment:
   Quite right, I'll update it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org