You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/05/02 08:41:11 UTC

[GitHub] [flink] AHeise commented on a diff in pull request #19286: [FLINK-25931] Add projection pushdown support for CsvFormatFactory

AHeise commented on code in PR #19286:
URL: https://github.com/apache/flink/pull/19286#discussion_r862629212


##########
flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFormatFactoryTest.java:
##########
@@ -227,6 +228,24 @@ public void testInvalidIgnoreParseError() {
         createTableSink(SCHEMA, options);
     }
 
+    @Test
+    public void testProjectionPushdown() throws IOException {

Review Comment:
   When testing projections, it's always good to test cases where you drop early columns. Only then you test if your converter is working as expected.
   
   ```java
       @Test
       public void testProjectionPushdown() throws IOException {
           final Map<String, String> options = getAllOptions();
   
           final Projection projection =
                   Projection.fromFieldNames(PHYSICAL_DATA_TYPE, Arrays.asList("b", "c"));
   
           final int[][] projectionMatrix = projection.toNestedIndexes();
           DeserializationSchema<RowData> actualDeser =
                   createDeserializationSchema(options, projectionMatrix);
   
           String data = "a1;2;false";
           RowData deserialized = actualDeser.deserialize(data.getBytes());
           GenericRowData expected = GenericRowData.of(2, false);
   
           assertEquals(deserialized, expected);
       }
   ```



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -65,13 +65,14 @@ public final class CsvRowDataDeserializationSchema implements DeserializationSch
     private final boolean ignoreParseErrors;
 
     private CsvRowDataDeserializationSchema(
-            RowType rowType,
+            RowType rowResultType,
             TypeInformation<RowData> resultTypeInfo,
             CsvSchema csvSchema,
             boolean ignoreParseErrors) {
         this.resultTypeInfo = resultTypeInfo;
         this.runtimeConverter =
-                new CsvToRowDataConverters(ignoreParseErrors).createRowConverter(rowType, true);
+                new CsvToRowDataConverters(ignoreParseErrors)
+                        .createRowConverter(rowResultType, true);

Review Comment:
   This should be done in the builder. The ctor should be as simple as possible in a builder pattern.



##########
flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvRowDataDeserializationSchema.java:
##########
@@ -81,19 +82,34 @@ private CsvRowDataDeserializationSchema(
     @Internal
     public static class Builder {
 
-        private final RowType rowType;
+        private final RowType rowResultType;
         private final TypeInformation<RowData> resultTypeInfo;
         private CsvSchema csvSchema;
         private boolean ignoreParseErrors;
 
+        /**
+         * Creates a CSV deserialization schema for the given {@link TypeInformation} with optional
+         * parameters.
+         */
+        public Builder(
+                RowType rowReadType,
+                RowType rowResultType,
+                TypeInformation<RowData> resultTypeInfo) {
+            Preconditions.checkNotNull(rowReadType, "RowType must not be null.");
+            Preconditions.checkNotNull(rowResultType, "RowType must not be null.");
+            Preconditions.checkNotNull(resultTypeInfo, "Result type information must not be null.");
+            this.rowResultType = rowResultType;
+            this.resultTypeInfo = resultTypeInfo;
+            this.csvSchema = CsvRowSchemaConverter.convert(rowReadType);

Review Comment:
   We can change the `ColumnType` of the columns in `CsvSchema` to `STRING` for those columns that are projected out.
   Currently, we would parse the data and then discard it. If the ignored column was a `STRING`, it would be significantly faster for numeric fields.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org