You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/03 16:55:46 UTC

[flink] 07/08: [FLINK-24687][table-runtime] Refactored test csv format to be independent of planner (except ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter) and to implement SerializationSchema more than BulkWriterFormatFactory. Moved to a specific package

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6bb090751093e4f9f8a05c80857af11381f88599
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Wed Nov 24 15:07:12 2021 +0100

    [FLINK-24687][table-runtime] Refactored test csv format to be independent of planner (except ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter) and to implement SerializationSchema more than BulkWriterFormatFactory. Moved to a specific package
    
    Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
 .../table/filesystem/FileSystemTableSink.java      |  5 +-
 .../testcsv}/TestCsvDeserializationSchema.java     | 35 +++++----
 .../testcsv/TestCsvFormatFactory.java}             | 89 +++++-----------------
 .../testcsv/TestCsvSerializationSchema.java        | 58 ++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  2 +-
 5 files changed, 104 insertions(+), 85 deletions(-)

diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 2e9af35..bbd7425 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -343,7 +343,10 @@ public class FileSystemTableSink extends AbstractFileSystemTable
             @Override
             public DynamicTableSource.DataStructureConverter createDataStructureConverter(
                     DataType producedDataType) {
-                throw new TableException("Compaction reader not support DataStructure converter.");
+                // This method cannot be implemented without changing the
+                // DynamicTableSink.DataStructureConverter interface
+                throw new UnsupportedOperationException(
+                        "Compaction reader not support DataStructure converter.");
             }
         };
     }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
similarity index 83%
rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
index dbec987..1569e0d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
@@ -16,24 +16,22 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.filesystem;
+package org.apache.flink.formats.testcsv;
 
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.conversion.DataStructureConverter;
-import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.parser.FieldParser;
 import org.apache.flink.util.InstantiationUtil;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.function.Function;
 
 /**
  * The {@link DeserializationSchema} that output {@link RowData}.
@@ -41,7 +39,7 @@ import java.util.List;
  * <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv
  * parser! If you need a feature complete CSV parser, check out the flink-csv package.
  */
-public class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
+class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
 
     private final List<DataType> physicalFieldTypes;
     private final int physicalFieldCount;
@@ -49,20 +47,34 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
     private final TypeInformation<RowData> typeInfo;
     private final int[] indexMapping;
 
-    @SuppressWarnings("rawtypes")
-    private transient DataStructureConverter[] csvRowToRowDataConverters;
+    private final DynamicTableSource.DataStructureConverter[] csvRowToRowDataConverters;
 
     private transient FieldParser<?>[] fieldParsers;
 
-    public TestCsvDeserializationSchema(DataType physicalDataType, List<String> orderedCsvColumns) {
+    public TestCsvDeserializationSchema(
+            DataType physicalDataType,
+            TypeInformation<RowData> typeInfo,
+            List<String> orderedCsvColumns,
+            Function<DataType, DynamicTableSource.DataStructureConverter> converterFactory) {
         this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType);
         this.physicalFieldCount = physicalFieldTypes.size();
-        this.typeInfo = InternalTypeInfo.of((RowType) physicalDataType.getLogicalType());
+        this.typeInfo = typeInfo;
 
         List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType);
         this.indexMapping =
                 orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray();
 
+        // Init data converters
+        int csvRowLength = indexMapping.length;
+        this.csvRowToRowDataConverters =
+                new DynamicTableSource.DataStructureConverter[csvRowLength];
+        for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
+            if (indexMapping[csvColumn] != -1) {
+                DataType fieldType = physicalFieldTypes.get(indexMapping[csvColumn]);
+                this.csvRowToRowDataConverters[csvColumn] = converterFactory.apply(fieldType);
+            }
+        }
+
         initFieldParsers();
     }
 
@@ -103,7 +115,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
     private void initFieldParsers() {
         int csvRowLength = indexMapping.length;
         this.fieldParsers = new FieldParser<?>[csvRowLength];
-        this.csvRowToRowDataConverters = new DataStructureConverter[csvRowLength];
         for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
             if (indexMapping[csvColumn] == -1) {
                 // The output type doesn't include this field, so just assign a string parser to
@@ -125,8 +136,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
             FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
 
             this.fieldParsers[csvColumn] = p;
-            this.csvRowToRowDataConverters[csvColumn] =
-                    DataStructureConverters.getConverter(fieldType);
         }
     }
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
similarity index 54%
rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
index 5bae8f5..fa564c4 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
@@ -16,10 +16,10 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.filesystem;
+package org.apache.flink.formats.testcsv;
 
-import org.apache.flink.api.common.serialization.BulkWriter;
 import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.table.connector.ChangelogMode;
@@ -30,34 +30,23 @@ import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.factories.BulkWriterFormatFactory;
 import org.apache.flink.table.factories.DeserializationFormatFactory;
 import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
-import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
-
 /**
  * Factory for csv test format.
  *
  * <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv
  * parser! If you need a feature complete CSV parser, check out the flink-csv package.
  */
-public class TestCsvFileSystemFormatFactory
-        implements DeserializationFormatFactory, BulkWriterFormatFactory {
+public class TestCsvFormatFactory
+        implements DeserializationFormatFactory, SerializationFormatFactory {
 
     @Override
     public String factoryIdentifier() {
@@ -74,40 +63,17 @@ public class TestCsvFileSystemFormatFactory
         return new HashSet<>();
     }
 
-    private static void writeCsvToStream(DataType[] types, RowData rowData, OutputStream stream)
-            throws IOException {
-        LogicalType[] fieldTypes =
-                Arrays.stream(types).map(DataType::getLogicalType).toArray(LogicalType[]::new);
-        DataFormatConverters.DataFormatConverter converter =
-                DataFormatConverters.getConverterForDataType(
-                        TypeConversions.fromLogicalToDataType(RowType.of(fieldTypes)));
-
-        Row row = (Row) converter.toExternal(rowData);
-        StringBuilder builder = new StringBuilder();
-        Object o;
-        for (int i = 0; i < row.getArity(); i++) {
-            if (i > 0) {
-                builder.append(DEFAULT_FIELD_DELIMITER);
-            }
-            if ((o = row.getField(i)) != null) {
-                builder.append(o);
-            }
-        }
-        String str = builder.toString();
-        stream.write(str.getBytes(StandardCharsets.UTF_8));
-        stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
-    }
-
     @Override
-    public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
+    public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
             DynamicTableFactory.Context context, ReadableConfig formatOptions) {
-        return new EncodingFormat<BulkWriter.Factory<RowData>>() {
+        return new EncodingFormat<SerializationSchema<RowData>>() {
             @Override
-            public BulkWriter.Factory<RowData> createRuntimeEncoder(
+            public SerializationSchema<RowData> createRuntimeEncoder(
                     DynamicTableSink.Context context, DataType consumedDataType) {
-                return out ->
-                        new CsvBulkWriter(
-                                consumedDataType.getChildren().toArray(new DataType[0]), out);
+                DynamicTableSink.DataStructureConverter converter =
+                        context.createDataStructureConverter(consumedDataType);
+
+                return new TestCsvSerializationSchema(converter);
             }
 
             @Override
@@ -129,7 +95,12 @@ public class TestCsvFileSystemFormatFactory
                 DataType projectedPhysicalDataType =
                         Projection.of(projections).project(physicalDataType);
                 return new TestCsvDeserializationSchema(
-                        projectedPhysicalDataType, DataType.getFieldNames(physicalDataType));
+                        projectedPhysicalDataType,
+                        context.createTypeInformation(projectedPhysicalDataType),
+                        DataType.getFieldNames(physicalDataType),
+                        // Check out the FileSystemTableSink#createSourceContext for more details on
+                        // why we need this
+                        ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter);
             }
 
             @Override
@@ -138,26 +109,4 @@ public class TestCsvFileSystemFormatFactory
             }
         };
     }
-
-    private static class CsvBulkWriter implements BulkWriter<RowData> {
-
-        private final DataType[] types;
-        private final OutputStream stream;
-
-        private CsvBulkWriter(DataType[] types, OutputStream stream) {
-            this.types = types;
-            this.stream = stream;
-        }
-
-        @Override
-        public void addElement(RowData element) throws IOException {
-            writeCsvToStream(types, element, stream);
-        }
-
-        @Override
-        public void flush() {}
-
-        @Override
-        public void finish() {}
-    }
 }
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java
new file mode 100644
index 0000000..9fc6800
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.testcsv;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
+
+/** @see TestCsvFormatFactory */
+class TestCsvSerializationSchema implements SerializationSchema<RowData> {
+
+    private final DynamicTableSink.DataStructureConverter converter;
+
+    public TestCsvSerializationSchema(DynamicTableSink.DataStructureConverter converter) {
+        this.converter = converter;
+    }
+
+    @Override
+    public void open(InitializationContext context) throws Exception {}
+
+    @Override
+    public byte[] serialize(RowData element) {
+        Row row = (Row) converter.toExternal(element);
+        StringBuilder builder = new StringBuilder();
+        Object o;
+        for (int i = 0; i < row.getArity(); i++) {
+            if (i > 0) {
+                builder.append(DEFAULT_FIELD_DELIMITER);
+            }
+            if ((o = row.getField(i)) != null) {
+                builder.append(o);
+            }
+        }
+        String str = builder.toString();
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+}
diff --git a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7487804..0d3371a 100644
--- a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.filesystem.TestCsvFileSystemFormatFactory
+org.apache.flink.formats.testcsv.TestCsvFileSystemFormatFactory