You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/12/03 16:55:46 UTC
[flink] 07/08: [FLINK-24687][table-runtime] Refactored test csv format to be independent of planner (except ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter) and to implement SerializationSchema more than BulkWriterFormatFactory. Moved to a specific package
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6bb090751093e4f9f8a05c80857af11381f88599
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Wed Nov 24 15:07:12 2021 +0100
[FLINK-24687][table-runtime] Refactored test csv format to be independent of planner (except ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter) and to implement SerializationSchema more than BulkWriterFormatFactory. Moved to a specific package
Signed-off-by: slinkydeveloper <fr...@gmail.com>
---
.../table/filesystem/FileSystemTableSink.java | 5 +-
.../testcsv}/TestCsvDeserializationSchema.java | 35 +++++----
.../testcsv/TestCsvFormatFactory.java} | 89 +++++-----------------
.../testcsv/TestCsvSerializationSchema.java | 58 ++++++++++++++
.../org.apache.flink.table.factories.Factory | 2 +-
5 files changed, 104 insertions(+), 85 deletions(-)
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 2e9af35..bbd7425 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -343,7 +343,10 @@ public class FileSystemTableSink extends AbstractFileSystemTable
@Override
public DynamicTableSource.DataStructureConverter createDataStructureConverter(
DataType producedDataType) {
- throw new TableException("Compaction reader not support DataStructure converter.");
+ // This method cannot be implemented without changing the
+ // DynamicTableSink.DataStructureConverter interface
+ throw new UnsupportedOperationException(
+ "Compaction reader not support DataStructure converter.");
}
};
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
similarity index 83%
rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
index dbec987..1569e0d 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvDeserializationSchema.java
@@ -16,24 +16,22 @@
* limitations under the License.
*/
-package org.apache.flink.table.filesystem;
+package org.apache.flink.formats.testcsv;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.conversion.DataStructureConverter;
-import org.apache.flink.table.data.conversion.DataStructureConverters;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.parser.FieldParser;
import org.apache.flink.util.InstantiationUtil;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.List;
+import java.util.function.Function;
/**
* The {@link DeserializationSchema} that output {@link RowData}.
@@ -41,7 +39,7 @@ import java.util.List;
* <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv
* parser! If you need a feature complete CSV parser, check out the flink-csv package.
*/
-public class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
+class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
private final List<DataType> physicalFieldTypes;
private final int physicalFieldCount;
@@ -49,20 +47,34 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
private final TypeInformation<RowData> typeInfo;
private final int[] indexMapping;
- @SuppressWarnings("rawtypes")
- private transient DataStructureConverter[] csvRowToRowDataConverters;
+ private final DynamicTableSource.DataStructureConverter[] csvRowToRowDataConverters;
private transient FieldParser<?>[] fieldParsers;
- public TestCsvDeserializationSchema(DataType physicalDataType, List<String> orderedCsvColumns) {
+ public TestCsvDeserializationSchema(
+ DataType physicalDataType,
+ TypeInformation<RowData> typeInfo,
+ List<String> orderedCsvColumns,
+ Function<DataType, DynamicTableSource.DataStructureConverter> converterFactory) {
this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType);
this.physicalFieldCount = physicalFieldTypes.size();
- this.typeInfo = InternalTypeInfo.of((RowType) physicalDataType.getLogicalType());
+ this.typeInfo = typeInfo;
List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType);
this.indexMapping =
orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray();
+ // Init data converters
+ int csvRowLength = indexMapping.length;
+ this.csvRowToRowDataConverters =
+ new DynamicTableSource.DataStructureConverter[csvRowLength];
+ for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
+ if (indexMapping[csvColumn] != -1) {
+ DataType fieldType = physicalFieldTypes.get(indexMapping[csvColumn]);
+ this.csvRowToRowDataConverters[csvColumn] = converterFactory.apply(fieldType);
+ }
+ }
+
initFieldParsers();
}
@@ -103,7 +115,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
private void initFieldParsers() {
int csvRowLength = indexMapping.length;
this.fieldParsers = new FieldParser<?>[csvRowLength];
- this.csvRowToRowDataConverters = new DataStructureConverter[csvRowLength];
for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
if (indexMapping[csvColumn] == -1) {
// The output type doesn't include this field, so just assign a string parser to
@@ -125,8 +136,6 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
this.fieldParsers[csvColumn] = p;
- this.csvRowToRowDataConverters[csvColumn] =
- DataStructureConverters.getConverter(fieldType);
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
similarity index 54%
rename from flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
rename to flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
index 5bae8f5..fa564c4 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvFormatFactory.java
@@ -16,10 +16,10 @@
* limitations under the License.
*/
-package org.apache.flink.table.filesystem;
+package org.apache.flink.formats.testcsv;
-import org.apache.flink.api.common.serialization.BulkWriter;
import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.ChangelogMode;
@@ -30,34 +30,23 @@ import org.apache.flink.table.connector.format.ProjectableDecodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.util.DataFormatConverters;
-import org.apache.flink.table.factories.BulkWriterFormatFactory;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.utils.TypeConversions;
-import org.apache.flink.types.Row;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
-import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
-import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
-
/**
* Factory for csv test format.
*
* <p>NOTE: This is meant only for testing purpose and doesn't provide a feature complete stable csv
* parser! If you need a feature complete CSV parser, check out the flink-csv package.
*/
-public class TestCsvFileSystemFormatFactory
- implements DeserializationFormatFactory, BulkWriterFormatFactory {
+public class TestCsvFormatFactory
+ implements DeserializationFormatFactory, SerializationFormatFactory {
@Override
public String factoryIdentifier() {
@@ -74,40 +63,17 @@ public class TestCsvFileSystemFormatFactory
return new HashSet<>();
}
- private static void writeCsvToStream(DataType[] types, RowData rowData, OutputStream stream)
- throws IOException {
- LogicalType[] fieldTypes =
- Arrays.stream(types).map(DataType::getLogicalType).toArray(LogicalType[]::new);
- DataFormatConverters.DataFormatConverter converter =
- DataFormatConverters.getConverterForDataType(
- TypeConversions.fromLogicalToDataType(RowType.of(fieldTypes)));
-
- Row row = (Row) converter.toExternal(rowData);
- StringBuilder builder = new StringBuilder();
- Object o;
- for (int i = 0; i < row.getArity(); i++) {
- if (i > 0) {
- builder.append(DEFAULT_FIELD_DELIMITER);
- }
- if ((o = row.getField(i)) != null) {
- builder.append(o);
- }
- }
- String str = builder.toString();
- stream.write(str.getBytes(StandardCharsets.UTF_8));
- stream.write(DEFAULT_LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));
- }
-
@Override
- public EncodingFormat<BulkWriter.Factory<RowData>> createEncodingFormat(
+ public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
- return new EncodingFormat<BulkWriter.Factory<RowData>>() {
+ return new EncodingFormat<SerializationSchema<RowData>>() {
@Override
- public BulkWriter.Factory<RowData> createRuntimeEncoder(
+ public SerializationSchema<RowData> createRuntimeEncoder(
DynamicTableSink.Context context, DataType consumedDataType) {
- return out ->
- new CsvBulkWriter(
- consumedDataType.getChildren().toArray(new DataType[0]), out);
+ DynamicTableSink.DataStructureConverter converter =
+ context.createDataStructureConverter(consumedDataType);
+
+ return new TestCsvSerializationSchema(converter);
}
@Override
@@ -129,7 +95,12 @@ public class TestCsvFileSystemFormatFactory
DataType projectedPhysicalDataType =
Projection.of(projections).project(physicalDataType);
return new TestCsvDeserializationSchema(
- projectedPhysicalDataType, DataType.getFieldNames(physicalDataType));
+ projectedPhysicalDataType,
+ context.createTypeInformation(projectedPhysicalDataType),
+ DataType.getFieldNames(physicalDataType),
+ // Check out the FileSystemTableSink#createSourceContext for more details on
+ // why we need this
+ ScanRuntimeProviderContext.INSTANCE::createDataStructureConverter);
}
@Override
@@ -138,26 +109,4 @@ public class TestCsvFileSystemFormatFactory
}
};
}
-
- private static class CsvBulkWriter implements BulkWriter<RowData> {
-
- private final DataType[] types;
- private final OutputStream stream;
-
- private CsvBulkWriter(DataType[] types, OutputStream stream) {
- this.types = types;
- this.stream = stream;
- }
-
- @Override
- public void addElement(RowData element) throws IOException {
- writeCsvToStream(types, element, stream);
- }
-
- @Override
- public void flush() {}
-
- @Override
- public void finish() {}
- }
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java
new file mode 100644
index 0000000..9fc6800
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/formats/testcsv/TestCsvSerializationSchema.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.testcsv;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.Row;
+
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
+
+/** @see TestCsvFormatFactory */
+class TestCsvSerializationSchema implements SerializationSchema<RowData> {
+
+ private final DynamicTableSink.DataStructureConverter converter;
+
+ public TestCsvSerializationSchema(DynamicTableSink.DataStructureConverter converter) {
+ this.converter = converter;
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {}
+
+ @Override
+ public byte[] serialize(RowData element) {
+ Row row = (Row) converter.toExternal(element);
+ StringBuilder builder = new StringBuilder();
+ Object o;
+ for (int i = 0; i < row.getArity(); i++) {
+ if (i > 0) {
+ builder.append(DEFAULT_FIELD_DELIMITER);
+ }
+ if ((o = row.getField(i)) != null) {
+ builder.append(o);
+ }
+ }
+ String str = builder.toString();
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 7487804..0d3371a 100644
--- a/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-runtime/src/test/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.flink.table.filesystem.TestCsvFileSystemFormatFactory
+org.apache.flink.formats.testcsv.TestCsvFileSystemFormatFactory