You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2020/06/11 11:42:07 UTC

[flink] branch master updated: [FLINK-18237][fs-connector] Exception when reading filesystem partitioned table with stream mode

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 35e2fca  [FLINK-18237][fs-connector] Exception when reading filesystem partitioned table with stream mode
35e2fca is described below

commit 35e2fca5c9b59a87a6f1a17628daf75d3de575b0
Author: Jingsong Lee <ji...@gmail.com>
AuthorDate: Thu Jun 11 19:41:36 2020 +0800

    [FLINK-18237][fs-connector] Exception when reading filesystem partitioned table with stream mode
    
    This closes #12576
---
 .../formats/csv/CsvFilesystemStreamITCase.java     |  9 +++----
 ...ase.java => CsvFilesystemStreamSinkITCase.java} |  4 +--
 .../table/filesystem/FileSystemTableSource.java    | 31 +++++++++++++++++++---
 3 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
index 976e70e..6e5d2e1 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.formats.csv;
 
-import org.apache.flink.table.planner.runtime.stream.FsStreamingSinkITCaseBase;
+import org.apache.flink.table.planner.runtime.stream.sql.StreamFileSystemITCaseBase;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -26,13 +26,12 @@ import java.util.List;
 /**
  * ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream mode.
  */
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamITCase extends StreamFileSystemITCaseBase {
+
 	@Override
-	public String[] additionalProperties() {
+	public String[] formatProperties() {
 		List<String> ret = new ArrayList<>();
 		ret.add("'format'='csv'");
-		// for test purpose
-		ret.add("'sink.rolling-policy.file-size'='1b'");
 		return ret.toArray(new String[0]);
 	}
 }
diff --git a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
similarity index 92%
copy from flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
copy to flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
index 976e70e..8b694dc 100644
--- a/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamITCase.java
+++ b/flink-formats/flink-csv/src/test/java/org/apache/flink/formats/csv/CsvFilesystemStreamSinkITCase.java
@@ -24,9 +24,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * ITCase to test csv format for {@link CsvFileSystemFormatFactory} in stream mode.
+ * ITCase to test csv format for {@link CsvFileSystemFormatFactory} for streaming sink.
  */
-public class CsvFilesystemStreamITCase extends FsStreamingSinkITCaseBase {
+public class CsvFilesystemStreamSinkITCase extends FsStreamingSinkITCaseBase {
 	@Override
 	public String[] additionalProperties() {
 		List<String> ret = new ArrayList<>();
diff --git a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
index 35ba899..d834907 100644
--- a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
+++ b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
@@ -19,24 +19,31 @@
 package org.apache.flink.table.filesystem;
 
 import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.io.CollectionInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DelegatingConfiguration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.factories.FileSystemFormatFactory;
+import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter;
 import org.apache.flink.table.sources.FilterableTableSource;
-import org.apache.flink.table.sources.InputFormatTableSource;
 import org.apache.flink.table.sources.LimitableTableSource;
 import org.apache.flink.table.sources.PartitionableTableSource;
 import org.apache.flink.table.sources.ProjectableTableSource;
+import org.apache.flink.table.sources.StreamTableSource;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.utils.PartitionPathUtils;
+import org.apache.flink.table.utils.TableConnectorUtils;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -52,7 +59,8 @@ import static org.apache.flink.table.filesystem.FileSystemTableFactory.createFor
 /**
  * File system table source.
  */
-public class FileSystemTableSource extends InputFormatTableSource<RowData> implements
+public class FileSystemTableSource implements
+		StreamTableSource<RowData>,
 		PartitionableTableSource,
 		ProjectableTableSource<RowData>,
 		LimitableTableSource<RowData>,
@@ -111,7 +119,22 @@ public class FileSystemTableSource extends InputFormatTableSource<RowData> imple
 	}
 
 	@Override
-	public InputFormat<RowData, ?> getInputFormat() {
+	public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) {
+		@SuppressWarnings("unchecked")
+		TypeInformation<RowData> typeInfo =
+				(TypeInformation<RowData>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType());
+		// Avoid using ContinuousFileMonitoringFunction
+		InputFormatSourceFunction<RowData> func = new InputFormatSourceFunction<>(getInputFormat(), typeInfo);
+		DataStreamSource<RowData> source = execEnv.addSource(func, explainSource(), typeInfo);
+		return source.name(explainSource());
+	}
+
+	@Override
+	public boolean isBounded() {
+		return true;
+	}
+
+	private InputFormat<RowData, ?> getInputFormat() {
 		// When this table has no partition, just return a empty source.
 		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
 			return new CollectionInputFormat<>(new ArrayList<>(), null);
@@ -301,7 +324,7 @@ public class FileSystemTableSource extends InputFormatTableSource<RowData> imple
 
 	@Override
 	public String explainSource() {
-		return super.explainSource() +
+		return TableConnectorUtils.generateRuntimeName(getClass(), getTableSchema().getFieldNames()) +
 				(readPartitions == null ? "" : ", readPartitions=" + readPartitions) +
 				(selectFields == null ? "" : ", selectFields=" + Arrays.toString(selectFields)) +
 				(limit == null ? "" : ", limit=" + limit) +