You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/14 11:20:27 UTC

[GitHub] [flink] HuangXingBo commented on a diff in pull request #20220: [FLINK-28464][python][format] Support CsvReaderFormat

HuangXingBo commented on code in PR #20220:
URL: https://github.com/apache/flink/pull/20220#discussion_r921041412


##########
docs/content/docs/connectors/datastream/formats/csv.md:
##########
@@ -113,6 +115,22 @@ CsvReaderFormat<ComplexPojo> csvFormat =
                         .build(),
                 TypeInformation.of(ComplexPojo.class));
 ```
+
+For PyFlink users, a csv schema can be defined by manually adding columns, and the output type of the csv source will be a Row with each column mapped to a field.
+```python
+schema = CsvSchema.builder() \
+    .add_number_column('id', number_type=DataTypes.BIGINT()) \

Review Comment:
   Why not use `TypeInformation`, but `DataType`?



##########
flink-python/pyflink/datastream/connectors/tests/test_file_system.py:
##########
@@ -34,6 +35,162 @@
 from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
 
 
+class FileSourceCsvReaderFormatTests(PyFlinkStreamingTestCase):
+
+    def setUp(self):
+        super().setUp()
+        self.test_sink = DataStreamTestSinkFunction()
+
+    def test_csv_primitive_column(self):
+        csv_file_name = tempfile.mktemp(suffix='.csv', dir=self.tempdir)

Review Comment:
   move this logic to the setUp?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org