You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/10/26 15:31:56 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #13767: [FLINK-19787][table-runtime] Migrate Filesystem connector to new table source sink interface

leonardBang commented on a change in pull request #13767:
URL: https://github.com/apache/flink/pull/13767#discussion_r512012227



##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.DataGenTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link DataGenTableSourceFactory}.

Review comment:
       update the note

##########
File path: flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/filesystem/FileSystemTableFactoryTest.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.filesystem;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.catalog.CatalogTableImpl;
+import org.apache.flink.table.catalog.ObjectIdentifier;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.factories.DataGenTableSourceFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link DataGenTableSourceFactory}.
+ */
+public class FileSystemTableFactoryTest {
+
+	private static final TableSchema TEST_SCHEMA = TableSchema.builder()
+			.field("f0", DataTypes.STRING())
+			.field("f1", DataTypes.BIGINT())
+			.field("f2", DataTypes.BIGINT())
+			.build();
+
+	@Test
+	public void testSourceSink() throws Exception {
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+		descriptor.putString("path", "/tmp");
+		descriptor.putString("format", "csv");
+
+		DynamicTableSource source = createSource(descriptor);
+		Assert.assertTrue(source instanceof FileSystemTableSource);
+
+		DynamicTableSink sink = createSink(descriptor);
+		Assert.assertTrue(sink instanceof FileSystemTableSink);
+	}
+
+	@Test
+	public void testLackOptionSource() {
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+		descriptor.putString("path", "/tmp");
+
+		try {
+			createSource(descriptor);
+		} catch (ValidationException e) {
+			Throwable cause = e.getCause();
+			Assert.assertTrue(cause.toString(), cause instanceof ValidationException);
+			Assert.assertTrue(cause.getMessage(), cause.getMessage().contains(
+					"Missing required options are:\n\nformat"));
+			return;
+		}
+
+		Assert.fail("Should fail by ValidationException.");
+	}
+
+	@Test
+	public void testLackOptionSink() {
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+		descriptor.putString("path", "/tmp");
+
+		try {
+			createSink(descriptor);
+		} catch (ValidationException e) {
+			Throwable cause = e.getCause();
+			Assert.assertTrue(cause.toString(), cause instanceof ValidationException);
+			Assert.assertTrue(cause.getMessage(), cause.getMessage().contains(
+					"Missing required options are:\n\nformat"));
+			return;
+		}
+
+		Assert.fail("Should fail by ValidationException.");
+	}
+
+	@Test
+	public void testUnsupportedOptionSource() {
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+		descriptor.putString("path", "/tmp");
+		descriptor.putString("format", "csv");
+		descriptor.putString("my_option", "my");
+
+		try {
+			createSource(descriptor);
+		} catch (ValidationException e) {
+			Throwable cause = e.getCause();
+			Assert.assertTrue(cause.toString(), cause instanceof ValidationException);
+			Assert.assertTrue(cause.getMessage(), cause.getMessage().contains(
+					"Unsupported options:\n\nmy_option"));
+			return;
+		}
+
+		Assert.fail("Should fail by ValidationException.");
+	}
+
+	@Test
+	public void testUnsupportedOptionSink() {
+		DescriptorProperties descriptor = new DescriptorProperties();
+		descriptor.putString(FactoryUtil.CONNECTOR.key(), "filesystem");
+		descriptor.putString("path", "/tmp");
+		descriptor.putString("format", "csv");
+		descriptor.putString("my_option", "my");
+
+		try {
+			createSink(descriptor);
+		} catch (ValidationException e) {
+			Throwable cause = e.getCause();
+			Assert.assertTrue(cause.toString(), cause instanceof ValidationException);
+			Assert.assertTrue(cause.getMessage(), cause.getMessage().contains(
+					"Unsupported options:\n\nmy_option"));
+			return;
+		}
+
+		Assert.fail("Should fail by ValidationException.");
+	}
+
+	private static DynamicTableSource createSource(DescriptorProperties properties) {
+		return FactoryUtil.createTableSource(
+				null,
+				ObjectIdentifier.of("", "", ""),

Review comment:
       That's better we can given a more readable identifier like `ObjectIdentifier.of("mycatalog", "mydb", "mytable")`
   

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSink.java
##########
@@ -351,19 +315,34 @@ public void setStaticPartition(Map<String, String> partitions) {
 	}
 
 	@Override
-	public TableSchema getTableSchema() {
-		return schema;
+	public boolean requiresPartitionGrouping(boolean supportsGrouping) {
+		this.dynamicGrouping = supportsGrouping;
+		return dynamicGrouping;
 	}
 
 	@Override
-	public DataType getConsumedDataType() {
-		return schema.toRowDataType().bridgedTo(RowData.class);
+	public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+		return ChangelogMode.insertOnly();
 	}
 
 	@Override
-	public boolean configurePartitionGrouping(boolean supportsGrouping) {
-		this.dynamicGrouping = supportsGrouping;
-		return dynamicGrouping;
+	public DynamicTableSink copy() {
+		return new FileSystemTableSink(context, overwrite, dynamicGrouping, staticPartitions);
+	}
+
+	@Override
+	public String asSummaryString() {
+		return "filesystem";

Review comment:
       FileSystem

##########
File path: flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroFilesystemStreamITCase.java
##########
@@ -32,7 +32,7 @@
 	public String[] additionalProperties() {
 		List<String> ret = new ArrayList<>();
 		ret.add("'format'='avro'");
-		ret.add("'format.avro.codec'='snappy'");

Review comment:
       Considering the format options is not validated, I suggest at least add one test in `FileSystemTableFactoryTest`

##########
File path: flink-table/flink-sql-client/pom.xml
##########
@@ -96,6 +96,12 @@ under the License.
 			<version>${project.version}</version>
 		</dependency>
 
+		<dependency>
+			<groupId>org.apache.flink</groupId>

Review comment:
       why we bundle this dependency here? test scope is enough if for test purpose

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableSource.java
##########
@@ -226,87 +195,63 @@ public long getPushedDownLimit() {
 						spec.forEach((k, v) -> ret.put(k, defaultPartName.equals(v) ? null : v));
 						return ret;
 					})
-					.collect(Collectors.toList());
+					.collect(Collectors.toList()));
 		} catch (Exception e) {
 			throw new TableException("Fetch partitions fail.", e);
 		}
 	}
 
 	@Override
-	public FileSystemTableSource applyPartitionPruning(
-			List<Map<String, String>> remainingPartitions) {
-		return new FileSystemTableSource(
-				schema,
-				path,
-				partitionKeys,
-				defaultPartName,
-				properties,
-				remainingPartitions,
-				selectFields,
-				limit,
-				filters);
+	public void applyPartitions(List<Map<String, String>> remainingPartitions) {
+		this.remainingPartitions = remainingPartitions;
 	}
 
 	@Override
-	public FileSystemTableSource projectFields(int[] fields) {
-		return new FileSystemTableSource(
-				schema,
-				path,
-				partitionKeys,
-				defaultPartName,
-				properties,
-				readPartitions,
-				fields,
-				limit,
-				filters);
+	public boolean supportsNestedProjection() {
+		return false;
 	}
 
 	@Override
-	public FileSystemTableSource applyLimit(long limit) {
-		return new FileSystemTableSource(
-				schema,
-				path,
-				partitionKeys,
-				defaultPartName,
-				properties,
-				readPartitions,
-				selectFields,
-				limit,
-				filters);
+	public void applyProjection(int[][] projectedFields) {
+		this.projectedFields = projectedFields;
 	}
 
 	@Override
-	public boolean isLimitPushedDown() {
-		return limit != null;
+	public FileSystemTableSource copy() {
+		return new FileSystemTableSource(context, projectedFields, remainingPartitions, filters, limit);
 	}
 
 	@Override
-	public FileSystemTableSource applyPredicate(List<Expression> predicates) {
-		return new FileSystemTableSource(
-				schema,
-				path,
-				partitionKeys,
-				defaultPartName,
-				properties,
-				readPartitions,
-				selectFields,
-				limit,
-				new ArrayList<>(predicates));
+	public String asSummaryString() {
+		return "filesystem";

Review comment:
       Filesystem

##########
File path: flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/filesystem/FileSystemTableFactory.java
##########
@@ -48,71 +36,59 @@
  * table or a catalog table.
  * 2.Support insert into (append) and insert overwrite.
  * 3.Support static and dynamic partition inserting.
- *
- * <p>Migrate to new source/sink interface after FLIP-95 is ready.
  */
 public class FileSystemTableFactory implements
-		TableSourceFactory<RowData>,
-		TableSinkFactory<RowData> {
+		DynamicTableSourceFactory,
+		DynamicTableSinkFactory {
 
 	public static final String IDENTIFIER = "filesystem";
 
 	@Override
-	public Map<String, String> requiredContext() {
-		Map<String, String> context = new HashMap<>();
-		context.put(CONNECTOR, IDENTIFIER);
-		return context;
+	public String factoryIdentifier() {
+		return IDENTIFIER;
 	}
 
 	@Override
-	public List<String> supportedProperties() {
-		// contains format properties.
-		return Collections.singletonList("*");
+	public DynamicTableSink createDynamicTableSink(Context context) {
+		validate(FactoryUtil.createTableFactoryHelper(this, context));
+		return new FileSystemTableSink(context);
 	}
 
 	@Override
-	public TableSource<RowData> createTableSource(TableSourceFactory.Context context) {
-		Configuration conf = new Configuration();
-		context.getTable().getOptions().forEach(conf::setString);
-
-		return new FileSystemTableSource(
-				TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()),
-				getPath(conf),
-				context.getTable().getPartitionKeys(),
-				conf.get(PARTITION_DEFAULT_NAME),
-				context.getTable().getProperties());
+	public DynamicTableSource createDynamicTableSource(Context context) {
+		validate(FactoryUtil.createTableFactoryHelper(this, context));
+		return new FileSystemTableSource(context);
 	}
 
-	@Override
-	public TableSink<RowData> createTableSink(TableSinkFactory.Context context) {
-		Configuration conf = new Configuration();
-		context.getTable().getOptions().forEach(conf::setString);
-
-		return new FileSystemTableSink(
-				context.getObjectIdentifier(),
-				context.isBounded(),
-				TableSchemaUtils.getPhysicalSchema(context.getTable().getSchema()),
-				getPath(conf),
-				context.getTable().getPartitionKeys(),
-				conf.get(PARTITION_DEFAULT_NAME),
-				context.getTable().getOptions());
+	private void validate(FactoryUtil.TableFactoryHelper helper) {
+		// Except format options, some format like parquet and orc can not list all support options.

Review comment:
       ```suggestion
   		// Except format options, some formats like parquet and orc can not list all supported options.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org