You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/07/05 13:34:37 UTC

[flink] 03/03: [FLINK-13088][table-api] Support instantiating the unfied TableEnvironment

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 593ff73d903f4e8147f11601e85abefef27fb62e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Thu Jul 4 13:08:02 2019 +0200

    [FLINK-13088][table-api] Support instantiating the unfied TableEnvironment
    
    This closes #8984
---
 .../java/internal/StreamTableEnvironmentImpl.java  |  6 +++-
 .../apache/flink/table/api/TableEnvironment.java   | 26 +++++++++++++++++
 .../table/api/internal/TableEnvironmentImpl.java   | 34 ++++++++++++++++++++++
 .../internal/StreamTableEnvironmentImpl.scala      | 10 +++++--
 4 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 4dde3d6..b24c087 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -96,17 +96,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 			StreamExecutionEnvironment executionEnvironment,
 			EnvironmentSettings settings,
 			TableConfig tableConfig) {
+
 		FunctionCatalog functionCatalog = new FunctionCatalog(
 			settings.getBuiltInCatalogName(),
 			settings.getBuiltInDatabaseName());
 		CatalogManager catalogManager = new CatalogManager(
 			settings.getBuiltInCatalogName(),
 			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
-		Map<String, String> plannerProperties = settings.toPlannerProperties();
+
 		Map<String, String> executorProperties = settings.toExecutorProperties();
 		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
+
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
 		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
 			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
 			functionCatalog,
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index 686e816..64ae899 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.api;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
@@ -54,6 +55,31 @@ import java.util.Optional;
 public interface TableEnvironment {
 
 	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs.
+	 *
+	 * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction
+	 * between Scala and Java API) and for bounded and unbounded data processing.
+	 *
+	 * <p>A table environment is responsible for:
+	 * <ul>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to
+	 * other Flink APIs, it might be necessary to use one of the available language-specific table environments
+	 * in the corresponding bridging modules.
+	 *
+	 * @param settings The environment settings used to instantiate the {@link TableEnvironment}.
+	 */
+	static TableEnvironment create(EnvironmentSettings settings) {
+		return TableEnvironmentImpl.create(settings);
+	}
+
+	/**
 	 * Creates a table from a table source.
 	 *
 	 * @param source table source used as table
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index b2c786a..a7c9377 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.CatalogNotExistException;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
@@ -35,15 +36,19 @@ import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.ConnectorCatalogTable;
 import org.apache.flink.table.catalog.ExternalCatalog;
 import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.QueryOperationCatalogView;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.descriptors.TableDescriptor;
 import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.operations.CatalogQueryOperation;
 import org.apache.flink.table.operations.CatalogSinkModifyOperation;
@@ -61,6 +66,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -109,6 +115,34 @@ public class TableEnvironmentImpl implements TableEnvironment {
 		);
 	}
 
+	public static TableEnvironmentImpl create(EnvironmentSettings settings) {
+
+		FunctionCatalog functionCatalog = new FunctionCatalog(
+			settings.getBuiltInCatalogName(),
+			settings.getBuiltInDatabaseName());
+		CatalogManager catalogManager = new CatalogManager(
+			settings.getBuiltInCatalogName(),
+			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+
+		Map<String, String> executorProperties = settings.toExecutorProperties();
+		Executor executor = ComponentFactoryService.find(ExecutorFactory.class, executorProperties)
+			.create(executorProperties);
+
+		TableConfig tableConfig = new TableConfig();
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
+		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
+
+		return new TableEnvironmentImpl(
+			catalogManager,
+			tableConfig,
+			executor,
+			functionCatalog,
+			planner,
+			!settings.isBatchMode()
+		);
+	}
+
 	@VisibleForTesting
 	public Planner getPlanner() {
 		return planner;
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index fedb317..3a9b9fe 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -269,15 +269,18 @@ object StreamTableEnvironmentImpl {
       settings: EnvironmentSettings,
       tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
-    val executorProperties = settings.toExecutorProperties
-    val plannerProperties = settings.toPlannerProperties
-    val executor = lookupExecutor(executorProperties, executionEnvironment)
+
     val functionCatalog = new FunctionCatalog(
       settings.getBuiltInCatalogName,
       settings.getBuiltInDatabaseName)
     val catalogManager = new CatalogManager(
       settings.getBuiltInCatalogName,
       new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+
+    val executorProperties = settings.toExecutorProperties
+    val executor = lookupExecutor(executorProperties, executionEnvironment)
+
+    val plannerProperties = settings.toPlannerProperties
     val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
       .create(
         plannerProperties,
@@ -285,6 +288,7 @@ object StreamTableEnvironmentImpl {
         tableConfig,
         functionCatalog,
         catalogManager)
+
     new StreamTableEnvironmentImpl(
       catalogManager,
       functionCatalog,