You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/02/22 03:50:45 UTC
[flink] 02/06: [FLINK-15912][table] Support create table
source/sink by context in sql-cli
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 306a89a3556ca3fbab0306301f56972ccf11641b
Author: JingsongLi <lz...@aliyun.com>
AuthorDate: Tue Feb 11 15:39:59 2020 +0800
[FLINK-15912][table] Support create table source/sink by context in sql-cli
---
.../client/gateway/local/ExecutionContext.java | 37 +++++++++++++++-------
.../gateway/utils/TestTableSinkFactoryBase.java | 11 +++----
.../gateway/utils/TestTableSourceFactoryBase.java | 10 +++---
.../flink/table/catalog/CatalogTableImpl.java | 15 +++++++++
.../table/descriptors/ConnectTableDescriptor.java | 15 +--------
.../table/descriptors/DescriptorProperties.java | 19 +++++++++++
6 files changed, 71 insertions(+), 36 deletions(-)
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index ba69ebd..8bad114 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -48,11 +48,12 @@ import org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.client.config.Environment;
import org.apache.flink.table.client.config.entries.DeploymentEntry;
-import org.apache.flink.table.client.config.entries.ExecutionEntry;
import org.apache.flink.table.client.config.entries.SinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceSinkTableEntry;
import org.apache.flink.table.client.config.entries.SourceTableEntry;
@@ -72,7 +73,9 @@ import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactoryContextImpl;
import org.apache.flink.table.factories.TableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactoryContextImpl;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
@@ -374,12 +377,18 @@ public class ExecutionContext<ClusterID> {
return factory.createCatalog(name, catalogProperties);
}
- private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) {
- if (execution.isStreamingPlanner()) {
+ private TableSource<?> createTableSource(String name, Map<String, String> sourceProperties) {
+ if (environment.getExecution().isStreamingPlanner()) {
final TableSourceFactory<?> factory = (TableSourceFactory<?>)
TableFactoryService.find(TableSourceFactory.class, sourceProperties, classLoader);
- return factory.createTableSource(sourceProperties);
- } else if (execution.isBatchPlanner()) {
+ return factory.createTableSource(new TableSourceFactoryContextImpl(
+ ObjectIdentifier.of(
+ tableEnv.getCurrentCatalog(),
+ tableEnv.getCurrentDatabase(),
+ name),
+ CatalogTableImpl.fromProperties(sourceProperties),
+ tableEnv.getConfig().getConfiguration()));
+ } else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSourceFactory<?> factory = (BatchTableSourceFactory<?>)
TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, classLoader);
return factory.createBatchTableSource(sourceProperties);
@@ -387,12 +396,18 @@ public class ExecutionContext<ClusterID> {
throw new SqlExecutionException("Unsupported execution type for sources.");
}
- private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) {
- if (execution.isStreamingPlanner()) {
+ private TableSink<?> createTableSink(String name, Map<String, String> sinkProperties) {
+ if (environment.getExecution().isStreamingPlanner()) {
final TableSinkFactory<?> factory = (TableSinkFactory<?>)
TableFactoryService.find(TableSinkFactory.class, sinkProperties, classLoader);
- return factory.createTableSink(sinkProperties);
- } else if (execution.isBatchPlanner()) {
+ return factory.createTableSink(new TableSinkFactoryContextImpl(
+ ObjectIdentifier.of(
+ tableEnv.getCurrentCatalog(),
+ tableEnv.getCurrentDatabase(),
+ name),
+ CatalogTableImpl.fromProperties(sinkProperties),
+ tableEnv.getConfig().getConfiguration()));
+ } else if (environment.getExecution().isBatchPlanner()) {
final BatchTableSinkFactory<?> factory = (BatchTableSinkFactory<?>)
TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, classLoader);
return factory.createBatchTableSink(sinkProperties);
@@ -567,10 +582,10 @@ public class ExecutionContext<ClusterID> {
Map<String, TableSink<?>> tableSinks = new HashMap<>();
environment.getTables().forEach((name, entry) -> {
if (entry instanceof SourceTableEntry || entry instanceof SourceSinkTableEntry) {
- tableSources.put(name, createTableSource(environment.getExecution(), entry.asMap(), classLoader));
+ tableSources.put(name, createTableSource(name, entry.asMap()));
}
if (entry instanceof SinkTableEntry || entry instanceof SourceSinkTableEntry) {
- tableSinks.put(name, createTableSink(environment.getExecution(), entry.asMap(), classLoader));
+ tableSinks.put(name, createTableSink(name, entry.asMap()));
}
});
// register table sources
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
index 408bf24..e934959 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSinkFactoryBase.java
@@ -22,9 +22,8 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.Types;
-import org.apache.flink.table.descriptors.DescriptorProperties;
-import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSinkFactory;
+import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.sinks.AppendStreamTableSink;
import org.apache.flink.table.sinks.StreamTableSink;
import org.apache.flink.table.sinks.TableSink;
@@ -93,12 +92,10 @@ public abstract class TestTableSinkFactoryBase implements StreamTableSinkFactory
}
@Override
- public StreamTableSink<Row> createStreamTableSink(Map<String, String> properties) {
- final DescriptorProperties params = new DescriptorProperties(true);
- params.putProperties(properties);
+ public StreamTableSink<Row> createTableSink(TableSinkFactory.Context context) {
return new TestTableSink(
- SchemaValidator.deriveTableSinkSchema(params),
- properties.get(testProperty));
+ context.getTable().getSchema(),
+ context.getTable().getProperties().get(testProperty));
}
// --------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
index e64ae51..f305b90 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/utils/TestTableSourceFactoryBase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.api.Types;
import org.apache.flink.table.descriptors.DescriptorProperties;
import org.apache.flink.table.descriptors.SchemaValidator;
import org.apache.flink.table.factories.StreamTableSourceFactory;
+import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.sources.DefinedProctimeAttribute;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
@@ -96,14 +97,15 @@ public abstract class TestTableSourceFactoryBase implements StreamTableSourceFac
}
@Override
- public StreamTableSource<Row> createStreamTableSource(Map<String, String> properties) {
+ public StreamTableSource<Row> createTableSource(TableSourceFactory.Context context) {
+ TableSchema schema = context.getTable().getSchema();
final DescriptorProperties params = new DescriptorProperties(true);
- params.putProperties(properties);
+ params.putProperties(context.getTable().getProperties());
final Optional<String> proctime = SchemaValidator.deriveProctimeAttribute(params);
final List<RowtimeAttributeDescriptor> rowtime = SchemaValidator.deriveRowtimeAttributes(params);
return new TestTableSource(
- TableSchemaUtils.getPhysicalSchema(params.getTableSchema(SCHEMA)),
- properties.get(testProperty),
+ schema,
+ context.getTable().getProperties().get(testProperty),
proctime.orElse(null),
rowtime);
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
index 5566fc3..9808b32 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogTableImpl.java
@@ -81,4 +81,19 @@ public class CatalogTableImpl extends AbstractCatalogTable {
return descriptor.asMap();
}
+
+ /**
+ * Construct a {@link CatalogTableImpl} from complete properties that contains table schema.
+ */
+ public static CatalogTableImpl fromProperties(Map<String, String> properties) {
+ DescriptorProperties descriptorProperties = new DescriptorProperties();
+ descriptorProperties.putProperties(properties);
+ TableSchema tableSchema = descriptorProperties.getTableSchema(Schema.SCHEMA);
+ descriptorProperties.removeKeyPrefix(Schema.SCHEMA);
+ return new CatalogTableImpl(
+ tableSchema,
+ descriptorProperties.asMap(),
+ ""
+ );
+ }
}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
index fb63382..a7c54c7 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/descriptors/ConnectTableDescriptor.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions;
import javax.annotation.Nullable;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
/**
@@ -80,19 +79,7 @@ public abstract class ConnectTableDescriptor
" use registerTableSource/registerTableSink/registerTableSourceAndSink.");
}
- Map<String, String> schemaProperties = schemaDescriptor.toProperties();
- TableSchema tableSchema = getTableSchema(schemaProperties);
-
- Map<String, String> properties = new HashMap<>(toProperties());
- schemaProperties.keySet().forEach(properties::remove);
-
- CatalogTableImpl catalogTable = new CatalogTableImpl(
- tableSchema,
- properties,
- ""
- );
-
- registration.createTemporaryTable(path, catalogTable);
+ registration.createTemporaryTable(path, CatalogTableImpl.fromProperties(toProperties()));
}
private TableSchema getTableSchema(Map<String, String> schemaProperties) {
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
index f299db4..1f17519 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java
@@ -44,6 +44,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -144,6 +145,24 @@ public class DescriptorProperties {
}
/**
+ * Removes the mapping for a key prefix from this properties if it is present.
+ *
+ * <p>For example: for prefix "flink", the kvs in properties like key "flink.k" and
+ * value "v" will be removed.
+ */
+ public void removeKeyPrefix(String prefix) {
+ checkNotNull(prefix);
+
+ Iterator<Map.Entry<String, String>> iterator = properties.entrySet().iterator();
+ while (iterator.hasNext()) {
+ String key = iterator.next().getKey();
+ if (key.startsWith(prefix)) {
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
* Adds a class under the given key.
*/
public void putClass(String key, Class<?> clazz) {