You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2021/11/09 14:44:16 UTC
[flink] branch master updated: [FLINK-24615][table] Add
infrastructure to support metadata for filesystem
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 1ea2102 [FLINK-24615][table] Add infrastructure to support metadata for filesystem
1ea2102 is described below
commit 1ea210278f1da41f193a7520306cab20596fd10f
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Oct 19 17:31:29 2021 +0200
[FLINK-24615][table] Add infrastructure to support metadata for filesystem
Signed-off-by: slinkydeveloper <fr...@gmail.com>
This closes #17544.
---
docs/content/docs/connectors/table/filesystem.md | 35 +++
.../util/RecordMapperWrapperRecordIterator.java | 64 ++++
.../flink/formats/avro/AvroFilesystemITCase.java | 5 +
.../formats/csv/CsvFilesystemBatchITCase.java | 10 +
.../formats/csv/CsvFilesystemStreamITCase.java | 5 +
.../flink/table/data/utils/EnrichedRowData.java | 323 +++++++++++++++++++++
.../table/data/utils/EnrichedRowDataTest.java | 75 +++++
.../filesystem/FileSystemTableSourceTest.java | 18 ++
.../table/filesystem/FileSystemTableSourceTest.xml | 19 ++
.../planner/runtime/FileSystemITCaseBase.scala | 99 ++++++-
.../batch/sql/BatchFileSystemITCaseBase.scala | 19 +-
.../stream/sql/StreamFileSystemITCaseBase.scala | 30 +-
.../stream/sql/StreamFileSystemTestCsvITCase.scala | 2 +
.../table/filesystem/AbstractFileSystemTable.java | 14 +-
.../filesystem/DeserializationSchemaAdapter.java | 106 +------
.../filesystem/FileInfoExtractorBulkFormat.java | 200 +++++++++++++
.../table/filesystem/FileSystemTableSink.java | 46 ++-
.../table/filesystem/FileSystemTableSource.java | 236 ++++++++++++---
.../filesystem/TestCsvDeserializationSchema.java | 60 ++--
.../filesystem/TestCsvFileSystemFormatFactory.java | 17 +-
20 files changed, 1180 insertions(+), 203 deletions(-)
diff --git a/docs/content/docs/connectors/table/filesystem.md b/docs/content/docs/connectors/table/filesystem.md
index 7f143b3..82ba012 100644
--- a/docs/content/docs/connectors/table/filesystem.md
+++ b/docs/content/docs/connectors/table/filesystem.md
@@ -108,6 +108,41 @@ The file system connector can be used to read single files or entire directories
When using a directory as the source path, there is **no defined order of ingestion** for the files inside the directory.
+### Available Metadata
+
+The following connector metadata can be accessed as metadata columns in a table definition. All the metadata are read only.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Key</th>
+ <th class="text-center" style="width: 30%">Data Type</th>
+ <th class="text-center" style="width: 40%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>filepath</code></td>
+ <td><code>STRING NOT NULL</code></td>
+ <td>Full path of the input file.</td>
+ </tr>
+ </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE MyUserTableWithFilepath (
+ column_name1 INT,
+ column_name2 STRING,
+ filepath STRING NOT NULL METADATA
+) WITH (
+ 'connector' = 'filesystem',
+ 'path' = 'file:///path/to/whatever',
+ 'format' = 'json'
+)
+```
+
## Streaming Sink
The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}})
diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
new file mode 100644
index 0000000..76c4fb0
--- /dev/null
+++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/util/RecordMapperWrapperRecordIterator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.src.util;
+
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+
+/**
+ * Implementation of {@link org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator}
+ * that wraps another iterator and performs the mapping of the records.
+ *
+ * @param <I> Input type
+ * @param <O> Mapped output type
+ */
+public class RecordMapperWrapperRecordIterator<I, O> implements BulkFormat.RecordIterator<O> {
+
+ /** Record mapper definition. */
+ @FunctionalInterface
+ public interface RecordMapper<I, O> {
+ /** Map the record. Both input value and output value are expected to be non-null. */
+ O map(I in);
+ }
+
+ private final BulkFormat.RecordIterator<I> wrapped;
+ private final RecordMapper<I, O> mapper;
+
+ public RecordMapperWrapperRecordIterator(
+ BulkFormat.RecordIterator<I> wrapped, RecordMapper<I, O> mapper) {
+ this.wrapped = wrapped;
+ this.mapper = mapper;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ @Override
+ public RecordAndPosition<O> next() {
+ RecordAndPosition record = this.wrapped.next();
+ if (record == null || record.getRecord() == null) {
+ return (RecordAndPosition<O>) record;
+ }
+
+ record.record = mapper.map((I) record.record);
+ return (RecordAndPosition<O>) record;
+ }
+
+ @Override
+ public void releaseBatch() {
+ this.wrapped.releaseBatch();
+ }
+}
diff --git a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
index e7a8029..b89c2be 100644
--- a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
+++ b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemITCase.java
@@ -46,6 +46,11 @@ public class AvroFilesystemITCase extends BatchFileSystemITCaseBase {
}
@Override
+ public boolean supportsReadingMetadata() {
+ return false;
+ }
+
+ @Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='avro'");
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
index 8df8ddb..f6dadc5 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemBatchITCase.java
@@ -40,6 +40,11 @@ public class CsvFilesystemBatchITCase {
public static class GeneralCsvFilesystemBatchITCase extends BatchFileSystemITCaseBase {
@Override
+ public boolean supportsReadingMetadata() {
+ return false;
+ }
+
+ @Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='csv'");
@@ -56,6 +61,11 @@ public class CsvFilesystemBatchITCase {
public static class EnrichedCsvFilesystemBatchITCase extends BatchFileSystemITCaseBase {
@Override
+ public boolean supportsReadingMetadata() {
+ return false;
+ }
+
+ @Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='csv'");
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
index 1070682..aa41694 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
@@ -27,6 +27,11 @@ import java.util.List;
public class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase {
@Override
+ public boolean supportsReadingMetadata() {
+ return false;
+ }
+
+ @Override
public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='csv'");
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java
new file mode 100644
index 0000000..1743563
--- /dev/null
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/utils/EnrichedRowData.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.data.ArrayData;
+import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.types.RowKind;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * An implementation of {@link RowData} which is backed by two {@link RowData} with a well-defined
+ * index mapping, One of the rows is fixed, while the other can be swapped for performant changes in
+ * hot code paths. The {@link RowKind} is inherited from the mutable row.
+ */
+@PublicEvolving
+public class EnrichedRowData implements RowData {
+
+ private final RowData fixedRow;
+ // The index mapping is built as follows: positive indexes are indexes refer to mutable row
+ // positions,
+ // while negative indexes (with -1 offset) refer to fixed row positions.
+ // For example an index mapping [0, 1, -1, -2, 2] means:
+ // * Index 0 -> mutable row index 0
+ // * Index 1 -> mutable row index 1
+ // * Index -1 -> fixed row index 0
+ // * Index -2 -> fixed row index 1
+ // * Index 2 -> mutable row index 2
+ private final int[] indexMapping;
+
+ private RowData mutableRow;
+
+ public EnrichedRowData(RowData fixedRow, int[] indexMapping) {
+ this.fixedRow = fixedRow;
+ this.indexMapping = indexMapping;
+ }
+
+ /**
+ * Replaces the mutable {@link RowData} backing this {@link EnrichedRowData}.
+ *
+ * <p>This method replaces the mutable row data in place and does not return a new object. This
+ * is done for performance reasons.
+ */
+ public EnrichedRowData replaceMutableRow(RowData mutableRow) {
+ this.mutableRow = mutableRow;
+ return this;
+ }
+
+ // ---------------------------------------------------------------------------------------------
+
+ @Override
+ public int getArity() {
+ return indexMapping.length;
+ }
+
+ @Override
+ public RowKind getRowKind() {
+ return mutableRow.getRowKind();
+ }
+
+ @Override
+ public void setRowKind(RowKind kind) {
+ mutableRow.setRowKind(kind);
+ }
+
+ @Override
+ public boolean isNullAt(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.isNullAt(index);
+ } else {
+ return fixedRow.isNullAt(-(index + 1));
+ }
+ }
+
+ @Override
+ public boolean getBoolean(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getBoolean(index);
+ } else {
+ return fixedRow.getBoolean(-(index + 1));
+ }
+ }
+
+ @Override
+ public byte getByte(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getByte(index);
+ } else {
+ return fixedRow.getByte(-(index + 1));
+ }
+ }
+
+ @Override
+ public short getShort(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getShort(index);
+ } else {
+ return fixedRow.getShort(-(index + 1));
+ }
+ }
+
+ @Override
+ public int getInt(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getInt(index);
+ } else {
+ return fixedRow.getInt(-(index + 1));
+ }
+ }
+
+ @Override
+ public long getLong(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getLong(index);
+ } else {
+ return fixedRow.getLong(-(index + 1));
+ }
+ }
+
+ @Override
+ public float getFloat(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getFloat(index);
+ } else {
+ return fixedRow.getFloat(-(index + 1));
+ }
+ }
+
+ @Override
+ public double getDouble(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getDouble(index);
+ } else {
+ return fixedRow.getDouble(-(index + 1));
+ }
+ }
+
+ @Override
+ public StringData getString(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getString(index);
+ } else {
+ return fixedRow.getString(-(index + 1));
+ }
+ }
+
+ @Override
+ public DecimalData getDecimal(int pos, int precision, int scale) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getDecimal(index, precision, scale);
+ } else {
+ return fixedRow.getDecimal(-(index + 1), precision, scale);
+ }
+ }
+
+ @Override
+ public TimestampData getTimestamp(int pos, int precision) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getTimestamp(index, precision);
+ } else {
+ return fixedRow.getTimestamp(-(index + 1), precision);
+ }
+ }
+
+ @Override
+ public <T> RawValueData<T> getRawValue(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getRawValue(index);
+ } else {
+ return fixedRow.getRawValue(-(index + 1));
+ }
+ }
+
+ @Override
+ public byte[] getBinary(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getBinary(index);
+ } else {
+ return fixedRow.getBinary(-(index + 1));
+ }
+ }
+
+ @Override
+ public ArrayData getArray(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getArray(index);
+ } else {
+ return fixedRow.getArray(-(index + 1));
+ }
+ }
+
+ @Override
+ public MapData getMap(int pos) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getMap(index);
+ } else {
+ return fixedRow.getMap(-(index + 1));
+ }
+ }
+
+ @Override
+ public RowData getRow(int pos, int numFields) {
+ int index = indexMapping[pos];
+ if (index >= 0) {
+ return mutableRow.getRow(index, numFields);
+ } else {
+ return fixedRow.getRow(-(index + 1), numFields);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EnrichedRowData that = (EnrichedRowData) o;
+ return Objects.equals(this.fixedRow, that.fixedRow)
+ && Objects.equals(this.mutableRow, that.mutableRow);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fixedRow, mutableRow);
+ }
+
+ @Override
+ public String toString() {
+ return mutableRow.getRowKind().shortString()
+ + "{"
+ + "fixedRow="
+ + fixedRow
+ + ", mutableRow="
+ + mutableRow
+ + '}';
+ }
+
+ /**
+ * Creates a new {@link EnrichedRowData} with the provided {@code fixedRow} as the immutable
+ * static row, and uses the {@code producedRowFields}, {@code fixedRowFields} and {@code
+ * mutableRowFields} arguments to compute the indexes mapping.
+ *
+ * <p>The {@code producedRowFields} should include the name of fields of the full row once
+ * mutable and fixed rows are merged, while {@code fixedRowFields} and {@code mutableRowFields}
+ * should contain respectively the field names of fixed row and mutable row. All the lists are
+ * ordered with indexes matching the position of the field in the row. As an example, for a
+ * complete row {@code (a, b, c)} the mutable row might be {@code (a, c)} and the fixed row
+ * might be {@code (b)}
+ */
+ public static EnrichedRowData from(
+ RowData fixedRow,
+ List<String> producedRowFields,
+ List<String> mutableRowFields,
+ List<String> fixedRowFields) {
+ return new EnrichedRowData(
+ fixedRow, computeIndexMapping(producedRowFields, mutableRowFields, fixedRowFields));
+ }
+
+ /**
+ * This method computes the index mapping for {@link EnrichedRowData}.
+ *
+ * @see EnrichedRowData#from(RowData, List, List, List)
+ */
+ public static int[] computeIndexMapping(
+ List<String> producedRowFields,
+ List<String> mutableRowFields,
+ List<String> fixedRowFields) {
+ int[] indexMapping = new int[producedRowFields.size()];
+
+ for (int i = 0; i < producedRowFields.size(); i++) {
+ String fieldName = producedRowFields.get(i);
+
+ int newIndex = mutableRowFields.indexOf(fieldName);
+ if (newIndex < 0) {
+ newIndex = -(fixedRowFields.indexOf(fieldName) + 1);
+ }
+
+ indexMapping[i] = newIndex;
+ }
+
+ return indexMapping;
+ }
+}
diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java
new file mode 100644
index 0000000..70a42f9
--- /dev/null
+++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/utils/EnrichedRowDataTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.utils;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/** Tests for {@link JoinedRowData}. */
+public class EnrichedRowDataTest {
+
+ @Test
+ public void testJoinedRows() {
+ final List<String> completeRowFields =
+ Arrays.asList(
+ "fixedRow1",
+ "mutableRow1",
+ "mutableRow3",
+ "mutableRow2",
+ "fixedRow2",
+ "mutableRow4");
+ final List<String> mutableRowFields =
+ Arrays.asList("mutableRow1", "mutableRow2", "mutableRow3", "mutableRow4");
+ final List<String> fixedRowFields = Arrays.asList("fixedRow1", "fixedRow2");
+
+ final RowData fixedRowData = GenericRowData.of(1L, 2L);
+ final EnrichedRowData enrichedRowData =
+ EnrichedRowData.from(
+ fixedRowData, completeRowFields, mutableRowFields, fixedRowFields);
+ final RowData mutableRowData = GenericRowData.of(3L, 4L, 5L, 6L);
+ enrichedRowData.replaceMutableRow(mutableRowData);
+
+ assertEquals(RowKind.INSERT, enrichedRowData.getRowKind());
+ assertEquals(6, enrichedRowData.getArity());
+ assertEquals(1L, enrichedRowData.getLong(0));
+ assertEquals(3L, enrichedRowData.getLong(1));
+ assertEquals(5L, enrichedRowData.getLong(2));
+ assertEquals(4L, enrichedRowData.getLong(3));
+ assertEquals(2L, enrichedRowData.getLong(4));
+ assertEquals(6L, enrichedRowData.getLong(5));
+
+ final RowData newMutableRowData = GenericRowData.of(7L, 8L, 9L, 10L);
+ enrichedRowData.replaceMutableRow(newMutableRowData);
+
+ assertEquals(1L, enrichedRowData.getLong(0));
+ assertEquals(7L, enrichedRowData.getLong(1));
+ assertEquals(9L, enrichedRowData.getLong(2));
+ assertEquals(8L, enrichedRowData.getLong(3));
+ assertEquals(2L, enrichedRowData.getLong(4));
+ assertEquals(10L, enrichedRowData.getLong(5));
+ }
+}
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
index 0827988..958e66f 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/filesystem/FileSystemTableSourceTest.java
@@ -47,6 +47,18 @@ public class FileSystemTableSourceTest extends TableTestBase {
+ " 'path' = '/tmp')";
tEnv.executeSql(srcTableDdl);
+ String srcTableWithMetaDdl =
+ "CREATE TABLE MyTableWithMeta (\n"
+ + " a bigint,\n"
+ + " b int,\n"
+ + " c varchar,\n"
+ + " filemeta STRING METADATA FROM 'filepath'\n"
+ + ") with (\n"
+ + " 'connector' = 'filesystem',"
+ + " 'format' = 'testcsv',"
+ + " 'path' = '/tmp')";
+ tEnv.executeSql(srcTableWithMetaDdl);
+
String sinkTableDdl =
"CREATE TABLE MySink (\n"
+ " a bigint,\n"
@@ -62,4 +74,10 @@ public class FileSystemTableSourceTest extends TableTestBase {
public void testFilterPushDown() {
util.verifyRelPlanInsert("insert into MySink select * from MyTable where a > 10");
}
+
+ @Test
+ public void testMetadataReading() {
+ util.verifyRelPlanInsert(
+ "insert into MySink(a, b, c) select a, b, filemeta from MyTableWithMeta");
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
index 97cde4f..087251c 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/filesystem/FileSystemTableSourceTest.xml
@@ -36,4 +36,23 @@ Sink(table=[default_catalog.default_database.MySink], fields=[a, b, c])
]]>
</Resource>
</TestCase>
+ <TestCase name="testMetadataReading">
+ <Resource name="sql">
+ <![CDATA[insert into MySink(a, b, c) select a, b, filemeta from MyTableWithMeta]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalSink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
++- LogicalProject(a=[$0], b=[$1], filemeta=[CAST($3):VARCHAR(2147483647) CHARACTER SET "UTF-16LE"])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTableWithMeta]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+Sink(table=[default_catalog.default_database.MySink], fields=[a, b, filemeta])
++- Calc(select=[a, b, CAST(filepath) AS filemeta])
+ +- TableSourceScan(table=[[default_catalog, default_database, MyTableWithMeta, project=[a, b], metadata=[filepath]]], fields=[a, b, filepath])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
index d80e5b4..85a81d4 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/FileSystemITCaseBase.scala
@@ -27,13 +27,11 @@ import org.apache.flink.table.planner.runtime.FileSystemITCaseBase._
import org.apache.flink.table.planner.runtime.utils.BatchTableEnvUtil
import org.apache.flink.table.planner.runtime.utils.BatchTestBase.row
import org.apache.flink.types.Row
-
-import org.junit.Assert.assertTrue
+import org.junit.Assert.{assertEquals, assertNotNull, assertTrue}
import org.junit.rules.TemporaryFolder
import org.junit.{Rule, Test}
import java.io.File
-
import scala.collection.{JavaConverters, Seq}
/**
@@ -51,6 +49,8 @@ trait FileSystemITCaseBase {
def tableEnv: TableEnvironment
+ def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit
+
def check(sqlQuery: String, expectedResult: Seq[Row]): Unit
def check(sqlQuery: String, expectedResult: java.util.List[Row]): Unit = {
@@ -58,6 +58,8 @@ trait FileSystemITCaseBase {
JavaConverters.asScalaIteratorConverter(expectedResult.iterator()).asScala.toSeq)
}
+ def supportsReadingMetadata: Boolean = true
+
def open(): Unit = {
resultPath = fileTmpFolder.newFolder().toURI.toString
BatchTableEnvUtil.registerCollection(
@@ -81,6 +83,24 @@ trait FileSystemITCaseBase {
|)
""".stripMargin
)
+ if (supportsReadingMetadata) {
+ tableEnv.executeSql(
+ s"""
+ |create table partitionedTableWithMetadata (
+ | x string,
+ | y int,
+ | a int,
+ | b bigint,
+ | c as b + 1,
+ | f string metadata from 'filepath'
+ |) partitioned by (a, b) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$resultPath',
+ | ${formatProperties().mkString(",\n")}
+ |)
+ """.stripMargin
+ )
+ }
tableEnv.executeSql(
s"""
|create table nonPartitionedTable (
@@ -95,6 +115,23 @@ trait FileSystemITCaseBase {
|)
""".stripMargin
)
+ if (supportsReadingMetadata) {
+ tableEnv.executeSql(
+ s"""
+ |create table nonPartitionedTableWithMetadata (
+ | x string,
+ | y int,
+ | a int,
+ | f string metadata from 'filepath',
+ | b bigint
+ |) with (
+ | 'connector' = 'filesystem',
+ | 'path' = '$resultPath',
+ | ${formatProperties().mkString(",\n")}
+ |)
+ """.stripMargin
+ )
+ }
tableEnv.executeSql(
s"""
@@ -184,6 +221,40 @@ trait FileSystemITCaseBase {
}
@Test
+ def testAllStaticPartitionsWithMetadata(): Unit = {
+ if (!supportsReadingMetadata) {
+ return
+ }
+
+ tableEnv.executeSql("insert into partitionedTable " +
+ "partition(a='1', b='1') select x, y from originalT where a=1 and b=1").await()
+
+ checkPredicate(
+ "select x, f, y from partitionedTableWithMetadata where a=1 and b=1",
+ row => {
+ assertEquals(3, row.getArity)
+ assertNotNull(row.getField("f"))
+ assertNotNull(row.getField(1))
+ assertTrue(
+ "The filepath value should begin with the temporary test path",
+ row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+ }
+ )
+
+ checkPredicate(
+ "select x, f, y from partitionedTableWithMetadata",
+ row => {
+ assertEquals(3, row.getArity)
+ assertNotNull(row.getField("f"))
+ assertNotNull(row.getField(1))
+ assertTrue(
+ "The filepath value should begin with the temporary test path",
+ row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+ }
+ )
+ }
+
+ @Test
def testPartialDynamicPartition(): Unit = {
tableEnv.executeSql("insert into partitionedTable " +
"partition(a=3) select x, y, b from originalT where a=3").await()
@@ -276,6 +347,28 @@ trait FileSystemITCaseBase {
}
@Test
+ def testNonPartitionWithMetadata(): Unit = {
+ if (!supportsReadingMetadata) {
+ return
+ }
+
+ tableEnv.executeSql("insert into nonPartitionedTable " +
+ "select x, y, a, b from originalT where a=1 and b=1").await()
+
+ checkPredicate(
+ "select x, f, y from nonPartitionedTableWithMetadata where a=1 and b=1",
+ row => {
+ assertEquals(3, row.getArity)
+ assertNotNull(row.getField("f"))
+ assertNotNull(row.getField(1))
+ assertTrue(
+ "The filepath value should begin with the temporary test path",
+ row.getFieldAs[String](1).contains(fileTmpFolder.getRoot.getPath))
+ }
+ )
+ }
+
+ @Test
def testLimitPushDown(): Unit = {
tableEnv.getConfig.getConfiguration.setInteger(
ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1)
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
index b7d923c..86797e1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/BatchFileSystemITCaseBase.scala
@@ -22,9 +22,9 @@ import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
import org.apache.flink.table.planner.runtime.utils.BatchTestBase
import org.apache.flink.types.Row
+import org.junit.{Assert, Before}
-import org.junit.Before
-
+import java.lang.AssertionError
import scala.collection.Seq
/**
@@ -45,4 +45,19 @@ abstract class BatchFileSystemITCaseBase extends BatchTestBase with FileSystemIT
override def check(sqlQuery: String, expectedResult: Seq[Row]): Unit = {
checkResult(sqlQuery, expectedResult)
}
+
+ override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit = {
+ val table = parseQuery(sqlQuery)
+ val result = executeQuery(table)
+
+ try {
+ result.foreach(checkFunc)
+ } catch {
+ case e: AssertionError => throw new AssertionError(
+ s"""
+ |Results do not match for query:
+ | $sqlQuery
+ """.stripMargin, e)
+ }
+ }
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
index 5b78fee..a0fd403 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemITCaseBase.scala
@@ -19,16 +19,16 @@
package org.apache.flink.table.planner.runtime.stream.sql
import org.apache.flink.api.scala._
+import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.planner.runtime.FileSystemITCaseBase
-import org.apache.flink.table.planner.runtime.utils.{StreamingTestBase, TestSinkUtil, TestingAppendSink}
+import org.apache.flink.table.planner.runtime.utils.{AbstractExactlyOnceSink, StreamingTestBase, TestSinkUtil, TestingAppendSink}
import org.apache.flink.types.Row
-
import org.junit.Assert.assertEquals
-import org.junit.{Before, Test}
+import org.junit.{Assert, Before, Test}
-import scala.collection.Seq
+import scala.collection.{Seq, mutable}
/**
* Streaming [[FileSystemITCaseBase]].
@@ -56,6 +56,28 @@ abstract class StreamFileSystemITCaseBase extends StreamingTestBase with FileSys
sink.getAppendResults.sorted)
}
+ override def checkPredicate(sqlQuery: String, checkFunc: Row => Unit): Unit = {
+ val result = tEnv.sqlQuery(sqlQuery).toDataStream
+ val sinkResults = new mutable.MutableList[Row]
+
+ val sink = new AbstractExactlyOnceSink[Row] {
+ override def invoke(value: Row, context: SinkFunction.Context): Unit =
+ sinkResults += value
+ }
+ result.addSink(sink)
+ env.execute()
+
+ try {
+ sinkResults.foreach(checkFunc)
+ } catch {
+ case e: AssertionError => throw new AssertionError(
+ s"""
+ |Results do not match for query:
+ | $sqlQuery
+ """.stripMargin, e)
+ }
+ }
+
// Streaming mode not support overwrite
@Test
override def testInsertOverwrite(): Unit = {}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
index 4ead229..065abd1 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/StreamFileSystemTestCsvITCase.scala
@@ -18,6 +18,8 @@
package org.apache.flink.table.planner.runtime.stream.sql
+import org.apache.flink.types.Row
+
import scala.collection.Seq
/**
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
index c8bcdd3..b96a371 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import java.util.List;
+import java.util.stream.Collectors;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PARTITION_DEFAULT_NAME;
import static org.apache.flink.table.filesystem.FileSystemConnectorOptions.PATH;
@@ -40,19 +41,21 @@ abstract class AbstractFileSystemTable {
final ObjectIdentifier tableIdentifier;
final Configuration tableOptions;
final ResolvedSchema schema;
- final List<String> partitionKeys;
final Path path;
final String defaultPartName;
+ List<String> partitionKeys;
+
AbstractFileSystemTable(DynamicTableFactory.Context context) {
this.context = context;
this.tableIdentifier = context.getObjectIdentifier();
this.tableOptions = new Configuration();
context.getCatalogTable().getOptions().forEach(tableOptions::setString);
this.schema = context.getCatalogTable().getResolvedSchema();
- this.partitionKeys = context.getCatalogTable().getPartitionKeys();
this.path = new Path(tableOptions.get(PATH));
this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
+
+ this.partitionKeys = context.getCatalogTable().getPartitionKeys();
}
ReadableConfig formatOptions(String identifier) {
@@ -64,9 +67,8 @@ abstract class AbstractFileSystemTable {
}
DataType getPhysicalDataTypeWithoutPartitionColumns() {
- return DataTypes.ROW(
- DataType.getFields(getPhysicalDataType()).stream()
- .filter(field -> !partitionKeys.contains(field.getName()))
- .toArray(DataTypes.Field[]::new));
+ return DataType.getFields(getPhysicalDataType()).stream()
+ .filter(field -> !partitionKeys.contains(field.getName()))
+ .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
}
}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
index 68fae2b..73851fa 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/DeserializationSchemaAdapter.java
@@ -28,14 +28,7 @@ import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.utils.PartitionPathUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.UserCodeClassLoader;
@@ -45,10 +38,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
import java.util.Queue;
-import java.util.stream.Collectors;
import static org.apache.flink.connector.file.src.util.CheckpointedPosition.NO_OFFSET;
import static org.apache.flink.table.data.vector.VectorizedColumnBatch.DEFAULT_SIZE;
@@ -58,71 +48,10 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
private static final int BATCH_SIZE = 100;
- // NOTE, deserializationSchema produce full format fields with original order
private final DeserializationSchema<RowData> deserializationSchema;
- private final String[] fieldNames;
- private final DataType[] fieldTypes;
- private final int[] projectFields;
- private final RowType projectedRowType;
-
- private final List<String> partitionKeys;
- private final String defaultPartValue;
-
- private final int[] toProjectedField;
- private final RowData.FieldGetter[] formatFieldGetters;
-
- public DeserializationSchemaAdapter(
- DeserializationSchema<RowData> deserializationSchema,
- DataType physicalDataType,
- int[] projectFields,
- List<String> partitionKeys,
- String defaultPartValue) {
+ public DeserializationSchemaAdapter(DeserializationSchema<RowData> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
- this.fieldNames = DataType.getFieldNames(physicalDataType).toArray(new String[0]);
- this.fieldTypes = DataType.getFieldDataTypes(physicalDataType).toArray(new DataType[0]);
- this.projectFields = projectFields;
- this.partitionKeys = partitionKeys;
- this.defaultPartValue = defaultPartValue;
-
- List<String> projectedNames =
- Arrays.stream(projectFields)
- .mapToObj(idx -> this.fieldNames[idx])
- .collect(Collectors.toList());
-
- this.projectedRowType =
- RowType.of(
- Arrays.stream(projectFields)
- .mapToObj(idx -> this.fieldTypes[idx].getLogicalType())
- .toArray(LogicalType[]::new),
- projectedNames.toArray(new String[0]));
-
- List<String> formatFields =
- Arrays.stream(this.fieldNames)
- .filter(field -> !partitionKeys.contains(field))
- .collect(Collectors.toList());
-
- List<String> formatProjectedFields =
- projectedNames.stream()
- .filter(field -> !partitionKeys.contains(field))
- .collect(Collectors.toList());
-
- this.toProjectedField =
- formatProjectedFields.stream().mapToInt(projectedNames::indexOf).toArray();
-
- this.formatFieldGetters = new RowData.FieldGetter[formatProjectedFields.size()];
- final Map<String, DataType> fieldDataTypesMap =
- DataType.getFields(physicalDataType).stream()
- .collect(
- Collectors.toMap(
- DataTypes.Field::getName, DataTypes.Field::getDataType));
- for (int i = 0; i < formatProjectedFields.size(); i++) {
- String name = formatProjectedFields.get(i);
- this.formatFieldGetters[i] =
- RowData.createFieldGetter(
- fieldDataTypesMap.get(name).getLogicalType(),
- formatFields.indexOf(name));
- }
}
private DeserializationSchema<RowData> createDeserialization() throws IOException {
@@ -168,7 +97,7 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
@Override
public TypeInformation<RowData> getProducedType() {
- return InternalTypeInfo.of(projectedRowType);
+ return deserializationSchema.getProducedType();
}
private class Reader implements BulkFormat.Reader<RowData> {
@@ -233,7 +162,6 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
private transient boolean end;
private transient RecordCollector collector;
- private transient GenericRowData rowData;
public LineBytesInputFormat(Path path, Configuration config) throws IOException {
super(path, config);
@@ -245,22 +173,6 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
super.open(split);
this.end = false;
this.collector = new RecordCollector();
- this.rowData =
- PartitionPathUtils.fillPartitionValueForRecord(
- fieldNames,
- fieldTypes,
- projectFields,
- partitionKeys,
- currentSplit.getPath(),
- defaultPartValue);
- }
-
- private GenericRowData newOutputRow() {
- GenericRowData row = new GenericRowData(rowData.getArity());
- for (int i = 0; i < row.getArity(); i++) {
- row.setField(i, rowData.getField(i));
- }
- return row;
}
@Override
@@ -284,24 +196,12 @@ public class DeserializationSchemaAdapter implements BulkFormat<RowData, FileSou
return null;
}
- private RowData convert(RowData record) {
- GenericRowData outputRow = newOutputRow();
-
- for (int i = 0; i < toProjectedField.length; i++) {
- outputRow.setField(
- toProjectedField[i], formatFieldGetters[i].getFieldOrNull(record));
- }
-
- outputRow.setRowKind(record.getRowKind());
- return outputRow;
- }
-
@Override
public RowData nextRecord(RowData reuse) throws IOException {
while (true) {
RowData record = collector.records.poll();
if (record != null) {
- return convert(record);
+ return record;
}
if (readLine()) {
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
new file mode 100644
index 0000000..1a0742e
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileInfoExtractorBulkFormat.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.src.FileSourceSplit;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.connector.file.src.util.RecordMapperWrapperRecordIterator;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.utils.EnrichedRowData;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.PartitionPathUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This {@link BulkFormat} is a wrapper that attaches file information columns to the output
+ * records.
+ */
+class FileInfoExtractorBulkFormat implements BulkFormat<RowData, FileSourceSplit> {
+
+ private final BulkFormat<RowData, FileSourceSplit> wrapped;
+ private final TypeInformation<RowData> producedType;
+
+ private final List<FileSystemTableSource.FileInfoAccessor> metadataColumnsFunctions;
+ private final List<Map.Entry<String, DataType>> partitionColumnTypes;
+ private final int[] extendedRowIndexMapping;
+
+ private final String defaultPartName;
+
+ public FileInfoExtractorBulkFormat(
+ BulkFormat<RowData, FileSourceSplit> wrapped,
+ DataType producedDataType,
+ Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns,
+ List<String> partitionColumns,
+ String defaultPartName) {
+ this.wrapped = wrapped;
+ this.producedType = InternalTypeInfo.of(producedDataType.getLogicalType());
+ this.defaultPartName = defaultPartName;
+
+ // Compute index mapping for the extended row and the functions to compute metadata
+ List<DataTypes.Field> producedRowField = DataType.getFields(producedDataType);
+ List<String> producedRowFieldNames =
+ producedRowField.stream()
+ .map(DataTypes.Field::getName)
+ .collect(Collectors.toList());
+ List<String> mutableRowFieldNames =
+ producedRowFieldNames.stream()
+ .filter(
+ key ->
+ !metadataColumns.containsKey(key)
+ && !partitionColumns.contains(key))
+ .collect(Collectors.toList());
+ List<String> metadataFieldNames = new ArrayList<>(metadataColumns.keySet());
+
+ List<String> fixedRowFieldNames =
+ Stream.concat(metadataFieldNames.stream(), partitionColumns.stream())
+ .collect(Collectors.toList());
+
+ this.partitionColumnTypes =
+ partitionColumns.stream()
+ .map(
+ fieldName ->
+ new SimpleImmutableEntry<>(
+ fieldName,
+ producedRowField
+ .get(
+ producedRowFieldNames.indexOf(
+ fieldName))
+ .getDataType()))
+ .collect(Collectors.toList());
+
+ this.extendedRowIndexMapping =
+ EnrichedRowData.computeIndexMapping(
+ producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames);
+ this.metadataColumnsFunctions =
+ metadataFieldNames.stream().map(metadataColumns::get).collect(Collectors.toList());
+ }
+
+ @Override
+ public Reader<RowData> createReader(Configuration config, FileSourceSplit split)
+ throws IOException {
+ return wrapReader(wrapped.createReader(config, split), split);
+ }
+
+ @Override
+ public Reader<RowData> restoreReader(Configuration config, FileSourceSplit split)
+ throws IOException {
+ return wrapReader(wrapped.restoreReader(config, split), split);
+ }
+
+ @Override
+ public boolean isSplittable() {
+ return wrapped.isSplittable();
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return producedType;
+ }
+
+ private Reader<RowData> wrapReader(Reader<RowData> superReader, FileSourceSplit split) {
+ // Fill the metadata + partition columns row
+ final GenericRowData fileInfoRowData =
+ new GenericRowData(metadataColumnsFunctions.size() + partitionColumnTypes.size());
+ int fileInfoRowIndex = 0;
+ for (; fileInfoRowIndex < metadataColumnsFunctions.size(); fileInfoRowIndex++) {
+ fileInfoRowData.setField(
+ fileInfoRowIndex,
+ metadataColumnsFunctions.get(fileInfoRowIndex).getValue(split));
+ }
+ if (!partitionColumnTypes.isEmpty()) {
+ final LinkedHashMap<String, String> partitionSpec =
+ PartitionPathUtils.extractPartitionSpecFromPath(split.path());
+ for (int partitionFieldIndex = 0;
+ fileInfoRowIndex < fileInfoRowData.getArity();
+ fileInfoRowIndex++, partitionFieldIndex++) {
+ final String fieldName = partitionColumnTypes.get(partitionFieldIndex).getKey();
+ final DataType fieldType = partitionColumnTypes.get(partitionFieldIndex).getValue();
+ if (!partitionSpec.containsKey(fieldName)) {
+ throw new RuntimeException(
+ "Cannot find the partition value from path for partition: "
+ + fieldName);
+ }
+
+ String valueStr = partitionSpec.get(fieldName);
+ valueStr = valueStr.equals(defaultPartName) ? null : valueStr;
+ fileInfoRowData.setField(
+ fileInfoRowIndex,
+ PartitionPathUtils.convertStringToInternalValue(valueStr, fieldType));
+ }
+ }
+
+ // This row is going to be reused for every record
+ final EnrichedRowData producedRowData =
+ new EnrichedRowData(fileInfoRowData, this.extendedRowIndexMapping);
+
+ return new ReaderWrapper(superReader, producedRowData);
+ }
+
+ private static final class ReaderWrapper implements Reader<RowData> {
+
+ private final Reader<RowData> wrappedReader;
+ private final EnrichedRowData producedRowData;
+
+ private ReaderWrapper(Reader<RowData> wrappedReader, EnrichedRowData producedRowData) {
+ this.wrappedReader = wrappedReader;
+ this.producedRowData = producedRowData;
+ }
+
+ @Nullable
+ @Override
+ public RecordIterator<RowData> readBatch() throws IOException {
+ RecordIterator<RowData> iterator = wrappedReader.readBatch();
+ if (iterator == null) {
+ return null;
+ }
+ return new RecordMapperWrapperRecordIterator<>(
+ iterator,
+ physicalRowData -> {
+ producedRowData.replaceMutableRow(physicalRowData);
+ return producedRowData;
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.wrappedReader.close();
+ }
+ }
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
index 154e51e..2e9af35 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
@@ -44,6 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSin
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink.BucketsBuilder;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
@@ -281,12 +282,8 @@ public class FileSystemTableSink extends AbstractFileSystemTable
}
private Optional<CompactReader.Factory<RowData>> createCompactReaderFactory(Context context) {
- if (bulkReaderFormat != null) {
- final BulkFormat<RowData, FileSourceSplit> format =
- bulkReaderFormat.createRuntimeDecoder(
- createSourceContext(context), getPhysicalDataType());
- return Optional.of(CompactBulkReader.factory(format));
- } else if (formatFactory != null) {
+ // TODO FLINK-19845 old format factory, to be removed soon.
+ if (formatFactory != null) {
final InputFormat<RowData, ?> format =
formatFactory.createReader(createReaderContext());
if (format instanceof FileInputFormat) {
@@ -294,19 +291,36 @@ public class FileSystemTableSink extends AbstractFileSystemTable
return Optional.of(
FileInputFormatCompactReader.factory((FileInputFormat<RowData>) format));
}
+ return Optional.empty();
+ }
+
+ // Compute producedDataType (including partition fields) and physicalDataType (excluding
+ // partition fields)
+ final DataType producedDataType = getPhysicalDataType();
+ final DataType physicalDataType =
+ DataType.getFields(producedDataType).stream()
+ .filter(field -> !partitionKeys.contains(field.getName()))
+ .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
+
+ if (bulkReaderFormat != null) {
+ final BulkFormat<RowData, FileSourceSplit> format =
+ new FileInfoExtractorBulkFormat(
+ bulkReaderFormat.createRuntimeDecoder(
+ createSourceContext(context), physicalDataType),
+ producedDataType,
+ Collections.emptyMap(),
+ partitionKeys,
+ defaultPartName);
+ return Optional.of(CompactBulkReader.factory(format));
} else if (deserializationFormat != null) {
- // NOTE, we need pass full format types to deserializationFormat
final DeserializationSchema<RowData> decoder =
deserializationFormat.createRuntimeDecoder(
- createSourceContext(context),
- getPhysicalDataTypeWithoutPartitionColumns());
- final int[] projectedFields =
- IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray();
- DeserializationSchemaAdapter format =
- new DeserializationSchemaAdapter(
- decoder,
- getPhysicalDataType(),
- projectedFields,
+ createSourceContext(context), physicalDataType);
+ final BulkFormat<RowData, FileSourceSplit> format =
+ new FileInfoExtractorBulkFormat(
+ new DeserializationSchemaAdapter(decoder),
+ producedDataType,
+ Collections.emptyMap(),
partitionKeys,
defaultPartName);
return Optional.of(CompactBulkReader.factory(format));
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index f187910..d1b12a7 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -28,6 +28,7 @@ import org.apache.flink.connector.file.src.FileSourceSplit;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
+import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
@@ -42,7 +43,9 @@ import org.apache.flink.table.connector.source.abilities.SupportsFilterPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsPartitionPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
+import org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
@@ -54,6 +57,7 @@ import org.apache.flink.table.utils.TableSchemaUtils;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -72,16 +76,20 @@ public class FileSystemTableSource extends AbstractFileSystemTable
SupportsProjectionPushDown,
SupportsLimitPushDown,
SupportsPartitionPushDown,
- SupportsFilterPushDown {
+ SupportsFilterPushDown,
+ SupportsReadingMetadata {
@Nullable private final DecodingFormat<BulkFormat<RowData, FileSourceSplit>> bulkReaderFormat;
@Nullable private final DecodingFormat<DeserializationSchema<RowData>> deserializationFormat;
@Nullable private final FileSystemFormatFactory formatFactory;
- private int[][] projectedFields;
+ // These mutable fields
private List<Map<String, String>> remainingPartitions;
private List<ResolvedExpression> filters;
private Long limit;
+ private int[][] projectFields;
+ private List<String> metadataKeys;
+ private DataType producedDataType;
public FileSystemTableSource(
DynamicTableFactory.Context context,
@@ -101,54 +109,123 @@ public class FileSystemTableSource extends AbstractFileSystemTable
this.bulkReaderFormat = bulkReaderFormat;
this.deserializationFormat = deserializationFormat;
this.formatFactory = formatFactory;
+
+ this.producedDataType = context.getPhysicalRowDataType();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+ // When this table has no partition, just return a empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
- // When this table has no partition, just return a empty source.
return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
- } else if (bulkReaderFormat != null) {
- if (bulkReaderFormat instanceof BulkDecodingFormat
- && filters != null
- && filters.size() > 0) {
- ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters);
+ }
+
+ // Physical type is computed from the full data type, filtering out partition and
+ // metadata columns. This type is going to be used by formats to parse the input.
+ List<DataTypes.Field> producedDataTypeFields = DataType.getFields(producedDataType);
+ if (metadataKeys != null && !metadataKeys.isEmpty()) {
+ // If metadata keys are present, then by SupportsReadingMetadata contract all the
+ // metadata columns will be at the end of the producedDataType, so we can just remove
+ // from the list the last metadataKeys.size() fields.
+ producedDataTypeFields =
+ producedDataTypeFields.subList(
+ 0, producedDataTypeFields.size() - metadataKeys.size());
+ }
+ DataType physicalDataType =
+ producedDataTypeFields.stream()
+ .filter(f -> partitionKeys == null || !partitionKeys.contains(f.getName()))
+ .collect(Collectors.collectingAndThen(Collectors.toList(), DataTypes::ROW));
+
+ // Resolve metadata and make sure to filter out metadata not in the producedDataType
+ List<String> metadataKeys =
+ (this.metadataKeys == null) ? Collections.emptyList() : this.metadataKeys;
+ metadataKeys =
+ DataType.getFieldNames(producedDataType).stream()
+ .filter(metadataKeys::contains)
+ .collect(Collectors.toList());
+ List<ReadableFileInfo> metadataToExtract =
+ metadataKeys.stream().map(ReadableFileInfo::resolve).collect(Collectors.toList());
+
+ // Filter out partition columns not in producedDataType
+ List<String> partitionKeysToExtract =
+ DataType.getFieldNames(producedDataType).stream()
+ .filter(this.partitionKeys::contains)
+ .collect(Collectors.toList());
+
+ // TODO FLINK-19845 old format factory, to be removed soon. The old factory doesn't support
+ // metadata.
+ if (formatFactory != null) {
+ if (!metadataToExtract.isEmpty()) {
+ throw new IllegalStateException(
+ "Metadata are not supported for format factories using FileSystemFormatFactory");
}
- BulkFormat<RowData, FileSourceSplit> bulkFormat =
- bulkReaderFormat.createRuntimeDecoder(scanContext, getProjectedDataType());
- return createSourceProvider(bulkFormat);
- } else if (formatFactory != null) {
+
// The ContinuousFileMonitoringFunction can not accept multiple paths. Default
// StreamEnv.createInput will create continuous function.
// Avoid using ContinuousFileMonitoringFunction.
return SourceFunctionProvider.of(
new InputFormatSourceFunction<>(
getInputFormat(),
- InternalTypeInfo.of(getProjectedDataType().getLogicalType())),
+ InternalTypeInfo.of(producedDataType.getLogicalType())),
true);
+ }
+
+ if (bulkReaderFormat != null) {
+ if (bulkReaderFormat instanceof BulkDecodingFormat
+ && filters != null
+ && filters.size() > 0) {
+ ((BulkDecodingFormat<RowData>) bulkReaderFormat).applyFilters(filters);
+ }
+ BulkFormat<RowData, FileSourceSplit> bulkFormat =
+ wrapBulkFormat(
+ bulkReaderFormat.createRuntimeDecoder(scanContext, physicalDataType),
+ producedDataType,
+ metadataToExtract,
+ partitionKeysToExtract);
+ return createSourceProvider(bulkFormat);
} else if (deserializationFormat != null) {
- // NOTE, we need pass full format types to deserializationFormat
DeserializationSchema<RowData> decoder =
- deserializationFormat.createRuntimeDecoder(
- scanContext, getPhysicalDataTypeWithoutPartitionColumns());
- return createSourceProvider(
- new DeserializationSchemaAdapter(
- decoder,
- getPhysicalDataType(),
- readFields(),
- partitionKeys,
- defaultPartName));
- // return sourceProvider(wrapDeserializationFormat(deserializationFormat), scanContext);
+ deserializationFormat.createRuntimeDecoder(scanContext, physicalDataType);
+ BulkFormat<RowData, FileSourceSplit> bulkFormat =
+ wrapBulkFormat(
+ new DeserializationSchemaAdapter(decoder),
+ producedDataType,
+ metadataToExtract,
+ partitionKeysToExtract);
+ return createSourceProvider(bulkFormat);
} else {
throw new TableException("Can not find format factory.");
}
}
+ /**
+ * Wraps bulk format in a {@link FileInfoExtractorBulkFormat} and {@link LimitableBulkFormat},
+ * if needed.
+ */
+ private BulkFormat<RowData, FileSourceSplit> wrapBulkFormat(
+ BulkFormat<RowData, FileSourceSplit> bulkFormat,
+ DataType producedDataType,
+ List<ReadableFileInfo> metadata,
+ List<String> partitionKeys) {
+ if (!metadata.isEmpty() || !partitionKeys.isEmpty()) {
+ bulkFormat =
+ new FileInfoExtractorBulkFormat(
+ bulkFormat,
+ producedDataType,
+ metadata.stream()
+ .collect(
+ Collectors.toMap(
+ ReadableFileInfo::getKey,
+ ReadableFileInfo::getAccessor)),
+ partitionKeys,
+ defaultPartName);
+ }
+ bulkFormat = LimitableBulkFormat.create(bulkFormat, limit);
+ return bulkFormat;
+ }
+
private SourceProvider createSourceProvider(BulkFormat<RowData, FileSourceSplit> bulkFormat) {
- FileSource.FileSourceBuilder<RowData> builder =
- FileSource.forBulkFileFormat(
- LimitableBulkFormat.create(bulkFormat, limit), paths());
- return SourceProvider.of(builder.build());
+ return SourceProvider.of(FileSource.forBulkFileFormat(bulkFormat, paths()).build());
}
private Path[] paths() {
@@ -195,7 +272,12 @@ public class FileSystemTableSource extends AbstractFileSystemTable
@Override
public int[] getProjectFields() {
- return readFields();
+ return projectFields == null
+ ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType()))
+ .toArray()
+ : Arrays.stream(projectFields)
+ .mapToInt(array -> array[0])
+ .toArray();
}
@Override
@@ -271,19 +353,17 @@ public class FileSystemTableSource extends AbstractFileSystemTable
}
@Override
- public void applyProjection(int[][] projectedFields, DataType producedDataType) {
- this.projectedFields = projectedFields;
- }
-
- @Override
public FileSystemTableSource copy() {
FileSystemTableSource source =
new FileSystemTableSource(
context, bulkReaderFormat, deserializationFormat, formatFactory);
- source.projectedFields = projectedFields;
+ source.partitionKeys = partitionKeys;
source.remainingPartitions = remainingPartitions;
source.filters = filters;
source.limit = limit;
+ source.projectFields = projectFields;
+ source.metadataKeys = metadataKeys;
+ source.producedDataType = producedDataType;
return source;
}
@@ -314,20 +394,84 @@ public class FileSystemTableSource extends AbstractFileSystemTable
return map;
}
- private int[] readFields() {
- return projectedFields == null
- ? IntStream.range(0, DataType.getFieldCount(getPhysicalDataType())).toArray()
- : Arrays.stream(projectedFields).mapToInt(array -> array[0]).toArray();
+ // --------------------------------------------------------------------------------------------
+ // Methods to apply projections and metadata,
+ // will influence the final output and physical type used by formats
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void applyProjection(int[][] projectedFields, DataType producedDataType) {
+ this.projectFields = projectedFields;
+ this.producedDataType = producedDataType;
}
- private DataType getProjectedDataType() {
- final DataType physicalDataType = super.getPhysicalDataType();
+ @Override
+ public void applyReadableMetadata(List<String> metadataKeys, DataType producedDataType) {
+ this.metadataKeys = metadataKeys;
+ this.producedDataType = producedDataType;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Metadata handling
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public Map<String, DataType> listReadableMetadata() {
+ return Arrays.stream(ReadableFileInfo.values())
+ .collect(Collectors.toMap(ReadableFileInfo::getKey, ReadableFileInfo::getDataType));
+ }
+
+ interface FileInfoAccessor extends Serializable {
+ /**
+ * Access the information from the {@link org.apache.flink.core.fs.FileInputSplit}. The
+ * return value type must be an internal type.
+ */
+ Object getValue(FileSourceSplit split);
+ }
+
+ enum ReadableFileInfo implements Serializable {
+ FILEPATH(
+ "filepath",
+ DataTypes.STRING().notNull(),
+ new FileInfoAccessor() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object getValue(FileSourceSplit split) {
+ return StringData.fromString(split.path().getPath());
+ }
+ });
+
+ final String key;
+ final DataType dataType;
+ final FileInfoAccessor converter;
+
+ ReadableFileInfo(String key, DataType dataType, FileInfoAccessor converter) {
+ this.key = key;
+ this.dataType = dataType;
+ this.converter = converter;
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public DataType getDataType() {
+ return dataType;
+ }
+
+ public FileInfoAccessor getAccessor() {
+ return converter;
+ }
- // If we haven't projected fields, we just return the original physical data type,
- // otherwise we need to compute the physical data type depending on the projected fields.
- if (projectedFields == null) {
- return physicalDataType;
+ public static ReadableFileInfo resolve(String key) {
+ return Arrays.stream(ReadableFileInfo.values())
+ .filter(readableFileInfo -> readableFileInfo.getKey().equals(key))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new IllegalArgumentException(
+ "Cannot resolve the provided ReadableMetadata key"));
}
- return DataType.projectFields(physicalDataType, projectedFields);
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
index aae8e4c..dbec987 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvDeserializationSchema.java
@@ -43,25 +43,25 @@ import java.util.List;
*/
public class TestCsvDeserializationSchema implements DeserializationSchema<RowData> {
- @SuppressWarnings("rawtypes")
- private final DataStructureConverter[] csvRowToRowDataConverters;
+ private final List<DataType> physicalFieldTypes;
+ private final int physicalFieldCount;
private final TypeInformation<RowData> typeInfo;
- private final int fieldCount;
- private final List<DataType> fieldTypes;
+ private final int[] indexMapping;
- private transient FieldParser<?>[] fieldParsers;
+ @SuppressWarnings("rawtypes")
+ private transient DataStructureConverter[] csvRowToRowDataConverters;
- public TestCsvDeserializationSchema(DataType dataType) {
- this.fieldTypes = DataType.getFieldDataTypes(dataType);
- this.fieldCount = fieldTypes.size();
+ private transient FieldParser<?>[] fieldParsers;
- this.csvRowToRowDataConverters =
- fieldTypes.stream()
- .map(DataStructureConverters::getConverter)
- .toArray(DataStructureConverter[]::new);
+ public TestCsvDeserializationSchema(DataType physicalDataType, List<String> orderedCsvColumns) {
+ this.physicalFieldTypes = DataType.getFieldDataTypes(physicalDataType);
+ this.physicalFieldCount = physicalFieldTypes.size();
+ this.typeInfo = InternalTypeInfo.of((RowType) physicalDataType.getLogicalType());
- this.typeInfo = InternalTypeInfo.of((RowType) dataType.getLogicalType());
+ List<String> physicalFieldNames = DataType.getFieldNames(physicalDataType);
+ this.indexMapping =
+ orderedCsvColumns.stream().mapToInt(physicalFieldNames::indexOf).toArray();
initFieldParsers();
}
@@ -74,14 +74,18 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
@SuppressWarnings("unchecked")
@Override
public RowData deserialize(byte[] message) throws IOException {
- GenericRowData row = new GenericRowData(fieldCount);
+ GenericRowData row = new GenericRowData(physicalFieldCount);
int startIndex = 0;
- for (int i = 0; i < fieldCount; i++) {
+ for (int csvColumn = 0; csvColumn < indexMapping.length; csvColumn++) {
startIndex =
- this.fieldParsers[i].resetErrorStateAndParse(
+ fieldParsers[csvColumn].resetErrorStateAndParse(
message, startIndex, message.length, new byte[] {','}, null);
- row.setField(
- i, csvRowToRowDataConverters[i].toInternal(fieldParsers[i].getLastResult()));
+ if (indexMapping[csvColumn] != -1) {
+ row.setField(
+ indexMapping[csvColumn],
+ csvRowToRowDataConverters[csvColumn].toInternal(
+ fieldParsers[csvColumn].getLastResult()));
+ }
}
return row;
}
@@ -97,9 +101,19 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
}
private void initFieldParsers() {
- this.fieldParsers = new FieldParser<?>[fieldCount];
- for (int i = 0; i < fieldTypes.size(); i++) {
- DataType fieldType = fieldTypes.get(i);
+ int csvRowLength = indexMapping.length;
+ this.fieldParsers = new FieldParser<?>[csvRowLength];
+ this.csvRowToRowDataConverters = new DataStructureConverter[csvRowLength];
+ for (int csvColumn = 0; csvColumn < csvRowLength; csvColumn++) {
+ if (indexMapping[csvColumn] == -1) {
+ // The output type doesn't include this field, so just assign a string parser to
+ // skip it
+ this.fieldParsers[csvColumn] =
+ InstantiationUtil.instantiate(
+ FieldParser.getParserForType(String.class), FieldParser.class);
+ continue;
+ }
+ DataType fieldType = physicalFieldTypes.get(indexMapping[csvColumn]);
Class<? extends FieldParser<?>> parserType =
FieldParser.getParserForType(
logicalTypeRootToFieldParserClass(
@@ -110,7 +124,9 @@ public class TestCsvDeserializationSchema implements DeserializationSchema<RowDa
FieldParser<?> p = InstantiationUtil.instantiate(parserType, FieldParser.class);
- this.fieldParsers[i] = p;
+ this.fieldParsers[csvColumn] = p;
+ this.csvRowToRowDataConverters[csvColumn] =
+ DataStructureConverters.getConverter(fieldType);
}
}
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
index 5595b87..03a80e7 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/filesystem/TestCsvFileSystemFormatFactory.java
@@ -43,7 +43,9 @@ import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_FIELD_DELIMITER;
import static org.apache.flink.api.java.io.CsvOutputFormat.DEFAULT_LINE_DELIMITER;
@@ -118,11 +120,24 @@ public class TestCsvFileSystemFormatFactory
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
+ List<String> schemaFields =
+ DataType.getFieldNames(context.getPhysicalRowDataType()).stream()
+ .filter(
+ field ->
+ !context.getCatalogTable()
+ .getPartitionKeys()
+ .contains(field))
+ .collect(Collectors.toList());
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType physicalDataType) {
- return new TestCsvDeserializationSchema(physicalDataType);
+ // TestCsvDeserializationSchema has no knowledge of the field names, and the
+ // implicit assumption done by tests is that the csv rows are composed by only the
+ // physical fields (excluding partition fields) in the same order as defined in the
+ // table declaration. This is why TestCsvDeserializationSchema needs
+ // schemaFields.
+ return new TestCsvDeserializationSchema(physicalDataType, schemaFields);
}
@Override