You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 13:34:37 UTC
[flink] 03/03: [FLINK-13088][table-api] Support instantiating the
unfied TableEnvironment
This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 593ff73d903f4e8147f11601e85abefef27fb62e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 4 13:08:02 2019 +0200
[FLINK-13088][table-api] Support instantiating the unfied TableEnvironment
This closes #8984
---
.../java/internal/StreamTableEnvironmentImpl.java | 6 +++-
.../apache/flink/table/api/TableEnvironment.java | 26 +++++++++++++++++
.../table/api/internal/TableEnvironmentImpl.java | 34 ++++++++++++++++++++++
.../internal/StreamTableEnvironmentImpl.scala | 10 +++++--
4 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 4dde3d6..b24c087 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -96,17 +96,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
StreamExecutionEnvironment executionEnvironment,
EnvironmentSettings settings,
TableConfig tableConfig) {
+
FunctionCatalog functionCatalog = new FunctionCatalog(
settings.getBuiltInCatalogName(),
settings.getBuiltInDatabaseName());
CatalogManager catalogManager = new CatalogManager(
settings.getBuiltInCatalogName(),
new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
- Map<String, String> plannerProperties = settings.toPlannerProperties();
+
Map<String, String> executorProperties = settings.toExecutorProperties();
Executor executor = lookupExecutor(executorProperties, executionEnvironment);
+
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
return new StreamTableEnvironmentImpl(
catalogManager,
functionCatalog,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 686e816..64ae899 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.ExternalCatalog;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
@@ -54,6 +55,31 @@ import java.util.Optional;
public interface TableEnvironment {
/**
+ * Creates a table environment that is the entry point and central context for creating Table & SQL
+ * API programs.
+ *
+ * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction
+ * between Scala and Java API) and for bounded and unbounded data processing.
+ *
+ * <p>A table environment is responsible for:
+ * <ul>
+ * <li>Connecting to external systems.</li>
+ * <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+ * <li>Executing SQL statements.</li>
+ * <li>Offering further configuration options.</li>
+ * </ul>
+ *
+ * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to
+ * other Flink APIs, it might be necessary to use one of the available language-specific table environments
+ * in the corresponding bridging modules.
+ *
+ * @param settings The environment settings used to instantiate the {@link TableEnvironment}.
+ */
+ static TableEnvironment create(EnvironmentSettings settings) {
+ return TableEnvironmentImpl.create(settings);
+ }
+
+ /**
* Creates a table from a table source.
*
* @param source table source used as table
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index b2c786a..a7c9377 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
@@ -35,15 +36,19 @@ import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.ConnectorCatalogTable;
import org.apache.flink.table.catalog.ExternalCatalog;
import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.QueryOperationCatalogView;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.descriptors.TableDescriptor;
import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.operations.CatalogQueryOperation;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -61,6 +66,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
/**
@@ -109,6 +115,34 @@ public class TableEnvironmentImpl implements TableEnvironment {
);
}
+ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
+
+ FunctionCatalog functionCatalog = new FunctionCatalog(
+ settings.getBuiltInCatalogName(),
+ settings.getBuiltInDatabaseName());
+ CatalogManager catalogManager = new CatalogManager(
+ settings.getBuiltInCatalogName(),
+ new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+
+ Map<String, String> executorProperties = settings.toExecutorProperties();
+ Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
+ .create(executorProperties);
+
+ TableConfig tableConfig = new TableConfig();
+ Map<String, String> plannerProperties = settings.toPlannerProperties();
+ Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
+ return new TableEnvironmentImpl(
+ catalogManager,
+ tableConfig,
+ executor,
+ functionCatalog,
+ planner,
+ !settings.isBatchMode()
+ );
+ }
+
@VisibleForTesting
public Planner getPlanner() {
return planner;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index fedb317..3a9b9fe 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -269,15 +269,18 @@ object StreamTableEnvironmentImpl {
settings: EnvironmentSettings,
tableConfig: TableConfig)
: StreamTableEnvironmentImpl = {
- val executorProperties = settings.toExecutorProperties
- val plannerProperties = settings.toPlannerProperties
- val executor = lookupExecutor(executorProperties, executionEnvironment)
+
val functionCatalog = new FunctionCatalog(
settings.getBuiltInCatalogName,
settings.getBuiltInDatabaseName)
val catalogManager = new CatalogManager(
settings.getBuiltInCatalogName,
new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+
+ val executorProperties = settings.toExecutorProperties
+ val executor = lookupExecutor(executorProperties, executionEnvironment)
+
+ val plannerProperties = settings.toPlannerProperties
val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
.create(
plannerProperties,
@@ -285,6 +288,7 @@ object StreamTableEnvironmentImpl {
tableConfig,
functionCatalog,
catalogManager)
+
new StreamTableEnvironmentImpl(
catalogManager,
functionCatalog,