You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/11 11:49:33 UTC
[flink] branch release-1.11 updated: [FLINK-18237][fs-connector]
Exception when reading filesystem partitioned table with stream mode
This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new ca9bd80 [FLINK-18237][fs-connector] Exception when reading filesystem partitioned table with stream mode
ca9bd80 is described below
commit ca9bd803a6cc9424040a5c5f834ee9e5efaa0fb1
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jun 11 19:41:36 2020 +0800
[FLINK-18237][fs-connector] Exception when reading filesystem partitioned table with stream mode
This closes #12576
---
.../formats/csv/CsvFilesystemStreamITCase.java | 9 +++----
...ase.java => CsvFilesystemStreamSinkITCase.java} | 4 +--
.../table/filesystem/FileSystemTableSource.java | 31 +++++++++++++++++++---
3 files changed, 33 insertions(+), 11 deletions(-)
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
index 976e70e..6e5d2e1 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
@@ -18,7 +18,7 @@
package org.apache.flink.formats.csv;
-import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+import org.apache.flink.table.planner.runtime.stream.sql.StreamFileSystemITCaseBase;
import java.util.ArrayList;
import java.util.List;
@@ -26,13 +26,12 @@ import java.util.List;
/**
* ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream mode.
*/
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase {
+
@Override
- public String[] additionalProperties() {
+ public String[] formatProperties() {
List<String> ret = new ArrayList<>();
ret.add("'format'='csv'");
- // for test purpose
- ret.add("'sink.rolling-policy.file-size'='1b'");
return ret.toArray(new String[0]);
}
}
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
similarity index 92%
copy from flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
copy to flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
index 976e70e..8b694dc 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
import java.util.List;
/**
- * ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream mode.
+ * ITCase to test csv format for {@link CsvFileSystemFormatFactory} for streaming sink.
*/
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamSinkITCase extends FsStreamingSinkITCaseBase {
@Override
public String[] additionalProperties() {
List<String> ret = new ArrayList<>();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index 35ba899..d834907 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -19,24 +19,31 @@
package org.apache.flink.table.filesystem;
import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.io.CollectionInputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.InputFormatTableSource;
import org.apache.flink.table.sources.LimitableTableSource;
import org.apache.flink.table.sources.PartitionableTableSource;
import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.table.utils.TableConnectorUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -52,7 +59,8 @@ import static org.apache.flink.table.filesystem.FileSystemTableFactory.createFor
/**
* File system table source.
*/
-public class FileSystemTableSource extends InputFormatTableSource<RowData> implements
+public class FileSystemTableSource implements
+ StreamTableSource<RowData>,
PartitionableTableSource,
ProjectableTableSource<RowData>,
LimitableTableSource<RowData>,
@@ -111,7 +119,22 @@ public class FileSystemTableSource extends InputFormatTableSource<RowData> imple
}
@Override
- public InputFormat<RowData, ?> getInputFormat() {
+ public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
+ @SuppressWarnings("unchecked")
+ TypeInformation<RowData> typeInfo =
+ (TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+ // Avoid using ContinuousFileMonitoringFunction
+ InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
+ DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
+ return source.name(explainSource());
+ }
+
+ @Override
+ public boolean isBounded() {
+ return true;
+ }
+
+ private InputFormat<RowData, ?> getInputFormat() {
// When this table has no partition, just return a empty source.
if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
return new CollectionInputFormat<>(new ArrayList<>(), null);
@@ -301,7 +324,7 @@ public class FileSystemTableSource extends InputFormatTableSource<RowData> imple
@Override
public String explainSource() {
- return super.explainSource() +
+ return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) +
(readPartitions == null ? "" : ", readPartitions=" + readPartitions) +
(selectFields == null ? "" : ", selectFields=" + Arrays.toString(selectFields)) +
(limit == null ? "" : ", limit=" + limit) +