You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/28 04:34:53 UTC

[GitHub] [flink] wuchong commented on a change in pull request #13605: [FLINK-19599][table] Introduce Filesystem format factories to integrate new FileSource to table

wuchong commented on a change in pull request #13605:
URL: https://github.com/apache/flink/pull/13605#discussion_r513164938



##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkFormat} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkFormatFactory extends DecodingFormatFactory<BulkFormat<RowData>> {
+	// interface is used for discovery but is already fully specified by the generics

Review comment:
       Remove this?

##########
File path: flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetFileSystemFormatFactory.java
##########
@@ -93,124 +117,17 @@ private static Configuration getParquetConfiguration(ReadableConfig options) {
 	}
 
 	@Override
-	public InputFormat<RowData, ?> createReader(ReaderContext context) {
-		return new ParquetInputFormat(
-				context.getPaths(),
-				context.getSchema().getFieldNames(),
-				context.getSchema().getFieldDataTypes(),
-				context.getProjectFields(),
-				context.getDefaultPartName(),
-				context.getPushedDownLimit(),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE));
+	public String factoryIdentifier() {
+		return "parquet";
 	}
 
 	@Override
-	public Optional<BulkWriter.Factory<RowData>> createBulkWriterFactory(WriterContext context) {
-		return Optional.of(ParquetRowDataBuilder.createWriterFactory(
-				RowType.of(Arrays.stream(context.getFormatFieldTypes())
-								.map(DataType::getLogicalType)
-								.toArray(LogicalType[]::new),
-						context.getFormatFieldNames()),
-				getParquetConfiguration(context.getFormatOptions()),
-				context.getFormatOptions().get(UTC_TIMEZONE)));
+	public Set<ConfigOption<?>> requiredOptions() {
+		return new HashSet<>();
 	}
 
 	@Override
-	public Optional<Encoder<RowData>> createEncoder(WriterContext context) {
-		return Optional.empty();
-	}
-
-	/**
-	 * An implementation of {@link ParquetInputFormat} to read {@link RowData} records
-	 * from Parquet files.
-	 */
-	public static class ParquetInputFormat extends FileInputFormat<RowData> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final String[] fullFieldNames;
-		private final DataType[] fullFieldTypes;
-		private final int[] selectedFields;
-		private final String partDefaultName;
-		private final boolean utcTimestamp;
-		private final SerializableConfiguration conf;
-		private final long limit;
-
-		private transient ParquetColumnarRowSplitReader reader;
-		private transient long currentReadCount;
-
-		public ParquetInputFormat(
-				Path[] paths,
-				String[] fullFieldNames,
-				DataType[] fullFieldTypes,
-				int[] selectedFields,
-				String partDefaultName,
-				long limit,
-				Configuration conf,
-				boolean utcTimestamp) {
-			super.setFilePaths(paths);
-			this.limit = limit;
-			this.partDefaultName = partDefaultName;
-			this.fullFieldNames = fullFieldNames;
-			this.fullFieldTypes = fullFieldTypes;
-			this.selectedFields = selectedFields;
-			this.conf = new SerializableConfiguration(conf);
-			this.utcTimestamp = utcTimestamp;
-		}
-
-		@Override
-		public void open(FileInputSplit fileSplit) throws IOException {
-			// generate partition specs.
-			List<String> fieldNameList = Arrays.asList(fullFieldNames);
-			LinkedHashMap<String, String> partSpec = PartitionPathUtils.extractPartitionSpecFromPath(
-					fileSplit.getPath());
-			LinkedHashMap<String, Object> partObjects = new LinkedHashMap<>();
-			partSpec.forEach((k, v) -> partObjects.put(k, restorePartValueFromType(
-					partDefaultName.equals(v) ? null : v,
-					fullFieldTypes[fieldNameList.indexOf(k)])));
-
-			this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
-					utcTimestamp,
-					true,
-					conf.conf(),
-					fullFieldNames,
-					fullFieldTypes,
-					partObjects,
-					selectedFields,
-					DEFAULT_SIZE,
-					new Path(fileSplit.getPath().toString()),
-					fileSplit.getStart(),
-					fileSplit.getLength());
-			this.currentReadCount = 0L;
-		}
-
-		@Override
-		public boolean supportsMultiPaths() {
-			return true;
-		}
-
-		@Override
-		public boolean reachedEnd() throws IOException {
-			if (currentReadCount >= limit) {
-				return true;
-			} else {
-				return reader.reachedEnd();
-			}
-		}
-
-		@Override
-		public RowData nextRecord(RowData reuse) {
-			currentReadCount++;
-			return reader.nextRecord();
-		}
-
-		@Override
-		public void close() throws IOException {
-			if (reader != null) {
-				this.reader.close();
-			}
-			this.reader = null;
-		}
+	public Set<ConfigOption<?>> optionalOptions() {
+		return new HashSet<>();

Review comment:
       Why `UTC_TIMEZONE` option is not in the set?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/AbstractFileSystemTable.java
##########
@@ -53,15 +62,75 @@
 		context.getCatalogTable().getOptions().forEach(tableOptions::setString);
 		this.schema = TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
 		this.partitionKeys = context.getCatalogTable().getPartitionKeys();
-		this.path = new Path(context.getCatalogTable().getOptions().getOrDefault(PATH.key(), PATH.defaultValue()));
-		this.defaultPartName = context.getCatalogTable().getOptions().getOrDefault(
-				PARTITION_DEFAULT_NAME.key(), PARTITION_DEFAULT_NAME.defaultValue());
+		this.path = new Path(tableOptions.get(PATH));
+		this.defaultPartName = tableOptions.get(PARTITION_DEFAULT_NAME);
 	}
 
-	static FileSystemFormatFactory createFormatFactory(ReadableConfig tableOptions) {
+	ReadableConfig formatOptions(String identifier) {
+		return new DelegatingConfiguration(tableOptions, identifier + ".");
+	}
+
+	FileSystemFormatFactory createFormatFactory() {
 		return FactoryUtil.discoverFactory(
 				Thread.currentThread().getContextClassLoader(),
 				FileSystemFormatFactory.class,
 				tableOptions.get(FactoryUtil.FORMAT));
 	}
+
+	@SuppressWarnings("rawtypes")
+	<F extends EncodingFormatFactory<?>> Optional<EncodingFormat> discoverOptionalEncodingFormat(

Review comment:
       Why not reuse `FactoryUtil.TableFactoryHelper#discoverOptionalEncodingFormat`?

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -88,7 +92,16 @@ private FileSystemTableSource(
 	}
 
 	@Override
-	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
+	public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
+		Optional<BulkDecodingFormat<RowData>> bulkDecodingFormat = discoverBulkDecodingFormat();
+
+		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
+			// When this table has no partition, just return a empty source.
+			return InputFormatProvider.of(new CollectionInputFormat<>(new ArrayList<>(), null));
+		} else if (bulkDecodingFormat.isPresent()) {
+			return SourceProvider.of(createBulkFormatSource(bulkDecodingFormat.get(), scanContext));
+		}
+
 		return new DataStreamScanProvider() {
 			@Override
 			public DataStream<RowData> produceDataStream(StreamExecutionEnvironment execEnv) {

Review comment:
       Why we should avoid using `ContinuousFileMonitoringFunction` here? and why not return `SourceFunctionProvider` of `InputFormatSourceFunction` directly?

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkFormatFactory.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.connector.file.src.reader.BulkFormat;
+import org.apache.flink.table.connector.format.BulkDecodingFormat;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkFormat} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkFormatFactory extends DecodingFormatFactory<BulkFormat<RowData>> {

Review comment:
       How about naming this `BulkReaderFormatFactory` which is more align with `BulkWriterFormatFactory`.

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/EncoderFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link Encoder} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface EncoderFactory extends EncodingFormatFactory<Encoder<RowData>> {

Review comment:
       Do we really need this factory? It seems duplicate with the `SerializationSchema`. I'm afraid we will introduce a lot of duplicate codes if we can't reuse existing `SerializationFormatFactory`s. 

##########
File path: flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/BulkWriterFactory.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.table.data.RowData;
+
+/**
+ * Base interface for configuring a {@link BulkWriter.Factory} for file system connector.
+ *
+ * @see FactoryUtil#createTableFactoryHelper(DynamicTableFactory, DynamicTableFactory.Context)
+ */
+@Internal
+public interface BulkWriterFactory extends EncodingFormatFactory<BulkWriter.Factory<RowData>> {

Review comment:
       How about naming this `BulkWriterFormatFactory` which is more align with `BulkReaderFormatFactory`.

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -199,15 +202,32 @@ private Path toStagingPath() {
 	}
 
 	@SuppressWarnings("unchecked")
-	private OutputFormatFactory<RowData> createOutputFormatFactory() {
-		Object writer = createWriter();
+	private OutputFormatFactory<RowData> createOutputFormatFactory(Context sinkContext) {
+		Object writer = createWriter(sinkContext);
 		return writer instanceof Encoder ?
 				path -> createEncoderOutputFormat((Encoder<RowData>) writer, path) :
 				path -> createBulkWriterOutputFormat((BulkWriter.Factory<RowData>) writer, path);
 	}
 
-	private Object createWriter() {
-		FileSystemFormatFactory formatFactory = createFormatFactory(tableOptions);
+	private DataType getFormatDataType() {
+		TableSchema.Builder builder = TableSchema.builder();
+		schema.getTableColumns().forEach(column -> {
+			if (!partitionKeys.contains(column.getName())) {
+				builder.add(column);
+			}
+		});
+		return builder.build().toRowDataType();
+	}
+
+	private Object createWriter(Context sinkContext) {
+		@SuppressWarnings("rawtypes")
+		Optional<EncodingFormat> encodingFormat = discoverOptionalEncodingFormat(BulkWriterFactory.class)
+				.map(Optional::of).orElseGet(() -> discoverOptionalEncodingFormat(EncoderFactory.class));
+		if (encodingFormat.isPresent()) {
+			return encodingFormat.get().createRuntimeEncoder(sinkContext, getFormatDataType());
+		}
+
+		FileSystemFormatFactory formatFactory = createFormatFactory();

Review comment:
       Why this is still needed? Do we need to migrate all the formats to use `EncodingFormatFactory` and `DecodingFormatFactory` before we can remove these code?  If yes, could you create an issue for that? 

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -108,13 +121,39 @@ public boolean isBounded() {
 		};
 	}
 
-	private InputFormat<RowData, ?> getInputFormat() {
-		// When this table has no partition, just return a empty source.
-		if (!partitionKeys.isEmpty() && getOrFetchPartitions().isEmpty()) {
-			return new CollectionInputFormat<>(new ArrayList<>(), null);
+	private FileSource<RowData> createBulkFormatSource(
+			BulkDecodingFormat<RowData> decodingFormat, ScanContext scanContext) {
+		decodingFormat.applyLimit(pushedDownLimit());
+		decodingFormat.applyFilters(pushedDownFilters());
+		BulkFormat<RowData> bulkFormat = decodingFormat.createRuntimeDecoder(
+				scanContext, getProducedDataType());
+		FileSource.FileSourceBuilder<RowData> builder = FileSource
+				.forBulkFileFormat(bulkFormat, paths());
+		return builder.build();
+	}
+
+	private Path[] paths() {
+		if (partitionKeys.isEmpty()) {
+			return new Path[] {path};
+		} else {
+			return getOrFetchPartitions().stream()
+					.map(FileSystemTableSource.this::toFullLinkedPartSpec)
+					.map(PartitionPathUtils::generatePartitionPath)
+					.map(n -> new Path(path, n))
+					.toArray(Path[]::new);
 		}
+	}
+
+	private long pushedDownLimit() {
+		return limit == null ? Long.MAX_VALUE : limit;

Review comment:
       I think `Long.MAX_VALUE` can't represent no limit, right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org