You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 08:48:58 UTC

[GitHub] [flink] Vancior opened a new pull request, #20220: [FLINK-28464][python][format] Support CsvReaderFormat

Vancior opened a new pull request, #20220:
URL: https://github.com/apache/flink/pull/20220

   
   
   ## What is the purpose of the change
   
   This PR supports reading csv files using CsvReaderFortmat API in PyFlink.
   
   ## Verifying this change
   
   
   
   This change added tests and can be verified as follows:
   
   - integration test `FileSourceCsvReaderFormatTests` in test_file_system.py
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (yes)
     - If yes, how is the feature documented? (docs & Python Sphinx doc)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dianfu closed pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat

Posted by GitBox <gi...@apache.org>.
dianfu closed pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat
URL: https://github.com/apache/flink/pull/20220


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] HuangXingBo commented on a diff in pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat

Posted by GitBox <gi...@apache.org>.
HuangXingBo commented on code in PR #20220:
URL: https://github.com/apache/flink/pull/20220#discussion_r921041412


##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
                         .build(),
                 TypeInformation.of(ComplexPojo.class));
 ```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \

Review Comment:
   Why not use `TypeInformation`, but `DataType`?



##########
flink-python/pyflink/datastream/connectors/tests/test_file_system.py:
##########
@@ -34,6 +35,162 @@
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
 
 
+class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.test_sink = DataStreamTestSinkFunction()
+
+    def test_csv_primitive_column(self):
+        csv_file_name = tempfile.mktemp(suffix='.csv', dir=self.tempdir)

Review Comment:
   move this logic to the setUp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] Vancior commented on a diff in pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat

Posted by GitBox <gi...@apache.org>.
Vancior commented on code in PR #20220:
URL: https://github.com/apache/flink/pull/20220#discussion_r921850578


##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
                         .build(),
                 TypeInformation.of(ComplexPojo.class));
 ```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \

Review Comment:
   Because I'm basically reusing the code implemented for Table API, where interfaces are designed for DataType.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20220:
URL: https://github.com/apache/flink/pull/20220#issuecomment-1178738356

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "8f73f57fcdf314406b3a7b4bb0745c62c359354e",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "8f73f57fcdf314406b3a7b4bb0745c62c359354e",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 8f73f57fcdf314406b3a7b4bb0745c62c359354e UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org