You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/22 03:50:45 UTC

[flink] 02/06: [FLINK-15912][table] Support create table source/sink by context in sql-cli

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 306a89a3556ca3fbab0306301f56972ccf11641b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Feb 11 15:39:59 2020 +0800

    [FLINK-15912][table] Support create table source/sink by context in sql-cli
---
 .../client/gateway/local/ExecutionContext.java     | 37 +++++++++++++++-------
 .../gateway/utils/TestTableSinkFactoryBase.java    | 11 +++----
 .../gateway/utils/TestTableSourceFactoryBase.java  | 10 +++---
 .../flink/table/catalog/CatalogTableImpl.java      | 15 +++++++++
 .../table/descriptors/ConnectTableDescriptor.java  | 15 +--------
 .../table/descriptors/DescriptorProperties.java    | 19 +++++++++++
 6 files changed, 71 insertions(+), 36 deletions(-)

diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ba69ebd..8bad114 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -48,11 +48,12 @@ import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
 import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
-import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.SinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
 import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -72,7 +73,9 @@ import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.factories.TableFactoryService;
 import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
 import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.FunctionDefinition;
 import org.apache.flink.table.functions.FunctionService;
@@ -374,12 +377,18 @@ public class ExecutionContext<ClusterID> {
 		return factory.createCatalog(name, catalogProperties);
 	}
 
-	private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
-		if (execution.isStreamingPlanner()) {
+	private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) {
+		if (environment.getExecution().isStreamingPlanner()) {
 			final TableSourceFactory<?> factory = (TableSourceFactory<?>)
 				TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
-			return factory.createTableSource(sourceProperties);
-		} else if (execution.isBatchPlanner()) {
+			return factory.createTableSource(new TableSourceFactoryContextImpl(
+					ObjectIdentifier.of(
+							tableEnv.getCurrentCatalog(),
+							tableEnv.getCurrentDatabase(),
+							name),
+					CatalogTableImpl.fromProperties(sourceProperties),
+					tableEnv.getConfig().getConfiguration()));
+		} else if (environment.getExecution().isBatchPlanner()) {
 			final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
 				TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
 			return factory.createBatchTableSource(sourceProperties);
@@ -387,12 +396,18 @@ public class ExecutionContext<ClusterID> {
 		throw new SqlExecutionException("Unsupported execution type for sources.");
 	}
 
-	private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
-		if (execution.isStreamingPlanner()) {
+	private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) {
+		if (environment.getExecution().isStreamingPlanner()) {
 			final TableSinkFactory<?> factory = (TableSinkFactory<?>)
 				TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
-			return factory.createTableSink(sinkProperties);
-		} else if (execution.isBatchPlanner()) {
+			return factory.createTableSink(new TableSinkFactoryContextImpl(
+					ObjectIdentifier.of(
+							tableEnv.getCurrentCatalog(),
+							tableEnv.getCurrentDatabase(),
+							name),
+					CatalogTableImpl.fromProperties(sinkProperties),
+					tableEnv.getConfig().getConfiguration()));
+		} else if (environment.getExecution().isBatchPlanner()) {
 			final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
 				TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
 			return factory.createBatchTableSink(sinkProperties);
@@ -567,10 +582,10 @@ public class ExecutionContext<ClusterID> {
 		Map<String, TableSink<?>> tableSinks = new HashMap<>();
 		environment.getTables().forEach((name, entry) -> {
 			if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
-				tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader));
+				tableSources.put(name, createTableSource(name, entry.asMap()));
 			}
 			if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
-				tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader));
+				tableSinks.put(name, createTableSink(name, entry.asMap()));
 			}
 		});
 		// register table sources
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index 408bf24..e934959 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactory;
 import org.apache.flink.table.sinks.AppendStreamTableSink;
 import org.apache.flink.table.sinks.StreamTableSink;
 import org.apache.flink.table.sinks.TableSink;
@@ -93,12 +92,10 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
 	}
 
 	@Override
-	public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
-		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
+	public StreamTableSink<Row> createTableSink(TableSinkFactory.Context context) {
 		return new TestTableSink(
-				SchemaValidator.deriveTableSinkSchema(params),
-				properties.get(testProperty));
+				context.getTable().getSchema(),
+				context.getTable().getProperties().get(testProperty));
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index e64ae51..f305b90 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.Types;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.descriptors.SchemaValidator;
 import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
 import org.apache.flink.table.sources.DefinedProctimeAttribute;
 import org.apache.flink.table.sources.DefinedRowtimeAttributes;
 import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -96,14 +97,15 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
 	}
 
 	@Override
-	public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+	public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
+		TableSchema schema = context.getTable().getSchema();
 		final DescriptorProperties params = new DescriptorProperties(true);
-		params.putProperties(properties);
+		params.putProperties(context.getTable().getProperties());
 		final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
 		final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
 		return new TestTableSource(
-			TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)),
-			properties.get(testProperty),
+			schema,
+			context.getTable().getProperties().get(testProperty),
 			proctime.orElse(null),
 			rowtime);
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 5566fc3..9808b32 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -81,4 +81,19 @@ public class CatalogTableImpl extends AbstractCatalogTable {
 
 		return descriptor.asMap();
 	}
+
+	/**
+	 * Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
+	 */
+	public static CatalogTableImpl fromProperties(Map<String, String> properties) {
+		DescriptorProperties descriptorProperties = new DescriptorProperties();
+		descriptorProperties.putProperties(properties);
+		TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA);
+		descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
+		return new CatalogTableImpl(
+				tableSchema,
+				descriptorProperties.asMap(),
+				""
+		);
+	}
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index fb63382..a7c54c7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 
 /**
@@ -80,19 +79,7 @@ public abstract class ConnectTableDescriptor
 					" use registerTableSource/registerTableSink/registerTableSourceAndSink.");
 		}
 
-		Map<String, String> schemaProperties = schemaDescriptor.toProperties();
-		TableSchema tableSchema = getTableSchema(schemaProperties);
-
-		Map<String, String> properties = new HashMap<>(toProperties());
-		schemaProperties.keySet().forEach(properties::remove);
-
-		CatalogTableImpl catalogTable = new CatalogTableImpl(
-			tableSchema,
-			properties,
-			""
-		);
-
-		registration.createTemporaryTable(path, catalogTable);
+		registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties()));
 	}
 
 	private TableSchema getTableSchema(Map<String, String> schemaProperties) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index f299db4..1f17519 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -144,6 +145,24 @@ public class DescriptorProperties {
 	}
 
 	/**
+	 * Removes the mapping for a key prefix from this properties if it is present.
+	 *
+	 * <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and
+	 * value "v" will be removed.
+	 */
+	public void removeKeyPrefix(String prefix) {
+		checkNotNull(prefix);
+
+		Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
+		while (iterator.hasNext()) {
+			String key = iterator.next().getKey();
+			if (key.startsWith(prefix)) {
+				iterator.remove();
+			}
+		}
+	}
+
+	/**
 	 * Adds a class under the given key.
 	 */
 	public void putClass(String key, Class<?> clazz) {