You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 14:25:43 UTC

[flink] 02/02: [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95944e231315f085d5e23717332aa2866caa5d8a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 19 15:00:23 2019 +0200

    [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor
    
    This closes #8852.
---
 flink-python/pyflink/table/table_config.py         |  36 ----
 .../table/tests/test_table_environment_api.py      |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.java     | 155 +++++++++-----
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java       | 228 +++++++++++++++++++++
 .../org/apache/flink/table/api/TableConfig.java    |  44 ----
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java    |  50 +++++
 .../internal => delegation}/PlannerFactory.java    |  49 ++---
 .../flink/table/factories/ComponentFactory.java    |  53 +++++
 .../table/factories/ComponentFactoryService.java   |  80 ++++++++
 .../factories/ComponentFactoryServiceTest.java     |  68 ++++++
 .../factories/utils/OtherTestPlannerFactory.java   |  28 +++
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   6 +-
 .../table/api/scala/BatchTableEnvironment.scala    |   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 ++++++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  90 ++++----
 .../flink/table/executor/StreamExecutor.java       |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala    |   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |  26 ++-
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++----
 29 files changed, 1052 insertions(+), 393 deletions(-)

diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 7eb3513..d6b5864 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -90,39 +90,3 @@ class TableConfig(object):
             self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
         else:
             raise Exception("TableConfig.max_generated_code_length should be a int value!")
-
-    def get_built_in_catalog_name(self):
-        """
-        Gets the specified name of the initial catalog to be created when instantiating
-        :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInCatalogName()
-
-    def set_built_in_catalog_name(self, built_in_catalog_name):
-        """
-        Specifies the name of the initial catalog to be created when instantiating
-        :class:`TableEnvironment`. This method has no effect if called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_catalog_name is not None and isinstance(built_in_catalog_name, str):
-            self._j_table_config.setBuiltInCatalogName(built_in_catalog_name)
-        else:
-            raise Exception("TableConfig.built_in_catalog_name should be a string value!")
-
-    def get_built_in_database_name(self):
-        """
-        Gets the specified name of the default database in the initial catalog to be created when
-        instantiating :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInDatabaseName()
-
-    def set_built_in_database_name(self, built_in_database_name):
-        """
-        Specifies the name of the default database in the initial catalog to be created when
-        instantiating :class:`TableEnvironment`. This method has no effect if called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_database_name is not None and isinstance(built_in_database_name, str):
-            self._j_table_config.setBuiltInDatabaseName(built_in_database_name)
-        else:
-            raise Exception("TableConfig.built_in_database_name should be a string value!")
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a129d22..4eba1bb 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -170,8 +170,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = StreamExecutionEnvironment.get_execution_environment()
         t_env = StreamTableEnvironment.create(env, table_config)
@@ -181,8 +179,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), "test_database")
 
 
 class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
@@ -208,22 +204,16 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         table_config.set_timezone("Asia/Shanghai")
         table_config.set_max_generated_code_length(64000)
         table_config.set_null_check(True)
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         self.assertTrue(table_config.get_null_check())
         self.assertEqual(table_config.get_max_generated_code_length(), 64000)
         self.assertEqual(table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), "test_database")
 
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = ExecutionEnvironment.get_execution_environment()
         t_env = BatchTableEnvironment.create(env, table_config)
@@ -233,5 +223,3 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(readed_table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(readed_table_config.get_built_in_database_name(), "test_database")
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index ac60fcc..fb0c80c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -110,7 +109,7 @@ public class ExecutionContextTest {
 		assertEquals(
 			new HashSet<>(
 				Arrays.asList(
-					TableConfig.getDefault().getBuiltInCatalogName(),
+					"default_catalog",
 					inmemoryCatalog,
 					hiveCatalog,
 					hiveDefaultVersionCatalog,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 4f92ae7..023d656 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
@@ -76,7 +75,6 @@ import static org.junit.Assert.fail;
 public class LocalExecutorITCase extends TestLogger {
 
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
-
 	private static final int NUM_TMS = 2;
 	private static final int NUM_SLOTS_PER_TM = 2;
 
@@ -158,7 +156,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final List<String> actualCatalogs = executor.listCatalogs(session);
 
 		final List<String> expectedCatalogs = Arrays.asList(
-			TableConfig.getDefault().getBuiltInCatalogName(),
+			"default_catalog",
 			"catalog1");
 		assertEquals(expectedCatalogs, actualCatalogs);
 	}
@@ -170,8 +168,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 		final List<String> actualDatabases = executor.listDatabases(session);
 
-		final List<String> expectedDatabases = Arrays.asList(
-			TableConfig.getDefault().getBuiltInDatabaseName());
+		final List<String> expectedDatabases = Arrays.asList("default_database");
 		assertEquals(expectedDatabases, actualDatabases);
 	}
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 1759b60..16f63af 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -287,9 +287,10 @@ public interface BatchTableEnvironment extends TableEnvironment {
 		try {
 			Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
 			Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
+			String defaultCatalog = "default_catalog";
 			CatalogManager catalogManager = new CatalogManager(
-				tableConfig.getBuiltInCatalogName(),
-				new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName())
+				defaultCatalog,
+				new GenericInMemoryCatalog(defaultCatalog, "default_database")
 			);
 			return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
 		} catch (Throwable t) {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index a4f5df2..6859780 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -35,24 +36,115 @@ import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 
 /**
- * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
- * {@link DataStream}s.
+ * This table environment is the entry point and central context for creating Table & SQL
+ * API programs that integrate with the Java-specific {@link DataStream} API.
  *
- * <p>A TableEnvironment can be used to:
+ * <p>It is unified for bounded and unbounded data processing.
+ *
+ * <p>A stream table environment is responsible for:
  * <ul>
- *     <li>convert a {@link DataStream} to a {@link Table}</li>
- *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
- *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
- *     <li>scan a registered table to obtain a {@link Table}</li>
- *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
- *     <li>convert a {@link Table} into a {@link DataStream}</li>
- *     <li>explain the AST and execution plan of a {@link Table}</li>
+ *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
+ *
+ * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+ * for pure table programs.
  */
 @PublicEvolving
 public interface StreamTableEnvironment extends TableEnvironment {
 
 	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 */
+	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
+		return create(
+			executionEnvironment,
+			EnvironmentSettings.newInstance().build());
+	}
+
+	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 * @param settings The environment settings used to instantiate the {@link TableEnvironment}.
+	 */
+	static StreamTableEnvironment create(
+			StreamExecutionEnvironment executionEnvironment,
+			EnvironmentSettings settings) {
+		return StreamTableEnvironmentImpl.create(
+			executionEnvironment,
+			settings,
+			new TableConfig()
+		);
+	}
+
+	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 * @param tableConfig The configuration of the {@link TableEnvironment}.
+	 * @deprecated Use {@link #create(StreamExecutionEnvironment)} and {@link #getConfig()}
+	 * for manipulating {@link TableConfig}.
+	 */
+	@Deprecated
+	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
+		return StreamTableEnvironmentImpl.create(
+			executionEnvironment,
+			EnvironmentSettings.newInstance().build(),
+			tableConfig);
+	}
+
+	/**
 	 * Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog.
 	 * Registered functions can be referenced in Table API and SQL queries.
 	 *
@@ -356,47 +448,4 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 */
 	@Override
 	StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
-
-	/**
-	 * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
-	 * {@link DataStream}s.
-	 *
-	 * <p>A TableEnvironment can be used to:
-	 * <ul>
-	 *     <li>convert a {@link DataStream} to a {@link Table}</li>
-	 *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>scan a registered table to obtain a {@link Table}</li>
-	 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
-	 *     <li>convert a {@link Table} into a {@link DataStream}</li>
-	 *     <li>explain the AST and execution plan of a {@link Table}</li>
-	 * </ul>
-	 *
-	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
-	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
-		return create(executionEnvironment, new TableConfig());
-	}
-
-	/**
-	 * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
-	 * {@link DataStream}s.
-	 *
-	 * <p>A TableEnvironment can be used to:
-	 * <ul>
-	 *     <li>convert a {@link DataStream} to a {@link Table}</li>
-	 *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>scan a registered table to obtain a {@link Table}</li>
-	 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
-	 *     <li>convert a {@link Table} into a {@link DataStream}</li>
-	 *     <li>explain the AST and execution plan of a {@link Table}</li>
-	 * </ul>
-	 *
-	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 */
-	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
-		return StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment);
-	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 0ea02a3..6b37690 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,24 +28,27 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.internal.PlannerFactory;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
@@ -60,6 +64,7 @@ import org.apache.flink.table.typeutils.FieldInfoUtils;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -73,7 +78,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 
 	private final StreamExecutionEnvironment executionEnvironment;
 
-	private StreamTableEnvironmentImpl(
+	@VisibleForTesting
+	public StreamTableEnvironmentImpl(
 			CatalogManager catalogManager,
 			FunctionCatalog functionCatalog,
 			TableConfig tableConfig,
@@ -92,33 +98,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 	 * @param tableConfig The configuration of the TableEnvironment.
 	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
 	 */
-	public static StreamTableEnvironmentImpl create(
-			TableConfig tableConfig,
-			StreamExecutionEnvironment executionEnvironment) {
-		CatalogManager catalogManager = new CatalogManager(
-			tableConfig.getBuiltInCatalogName(),
-			new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName()));
-		return create(catalogManager, tableConfig, executionEnvironment);
-	}
-
-	/**
-	 * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for
-	 * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting
-	 * from/to {@link DataStream}.
-	 *
-	 * @param catalogManager The {@link CatalogManager} to use for storing and looking up {@link Table}s.
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
-	public static StreamTableEnvironmentImpl create(
-			CatalogManager catalogManager,
-			TableConfig tableConfig,
-			StreamExecutionEnvironment executionEnvironment) {
+	public static StreamTableEnvironment create(
+			StreamExecutionEnvironment executionEnvironment,
+			EnvironmentSettings settings,
+			TableConfig tableConfig) {
 		FunctionCatalog functionCatalog = new FunctionCatalog(
-			catalogManager.getCurrentCatalog(),
-			catalogManager.getCurrentDatabase());
-		Executor executor = lookupExecutor(executionEnvironment);
-		Planner planner = PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager);
+			settings.getBuiltInCatalogName(),
+			settings.getBuiltInDatabaseName());
+		CatalogManager catalogManager = new CatalogManager(
+			settings.getBuiltInCatalogName(),
+			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
+		Map<String, String> executorProperties = settings.toExecutorProperties();
+		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
+		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
 			functionCatalog,
@@ -129,12 +123,18 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		);
 	}
 
-	private static Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+	private static Executor lookupExecutor(
+			Map<String, String> executorProperties,
+			StreamExecutionEnvironment executionEnvironment) {
 		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory");
-			Method createMethod = clazz.getMethod("create", StreamExecutionEnvironment.class);
-
-			return (Executor) createMethod.invoke(null, executionEnvironment);
+			ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+			Method createMethod = executorFactory.getClass()
+				.getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+			return (Executor) createMethod.invoke(
+				executorFactory,
+				executorProperties,
+				executionEnvironment);
 		} catch (Exception e) {
 			throw new TableException(
 				"Could not instantiate the executor. Make sure a planner module is on the classpath",
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
new file mode 100644
index 0000000..37ba179
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines all parameters that initialize a table environment. Those parameters are used only
+ * during instantiation of a {@link TableEnvironment} and cannot be changed afterwards.
+ *
+ * <p>Example:
+ * <pre>{@code
+ *    EnvironmentSettings.newInstance()
+ *      .useOldPlanner()
+ *      .inStreamingMode()
+ *      .withBuiltInCatalogName("default_catalog")
+ *      .withBuiltInDatabaseName("default_database")
+ *      .build()
+ * }</pre>
+ */
+@PublicEvolving
+public class EnvironmentSettings {
+	public static final String BATCH_MODE = "batch-mode";
+	public static final String CLASS_NAME = "class-name";
+
+	/**
+	 * Canonical name of the {@link Planner} class to use.
+	 */
+	private final String plannerClass;
+
+	/**
+	 * Canonical name of the {@link Executor} class to use.
+	 */
+	private final String executorClass;
+
+	/**
+	 * Specifies the name of the initial catalog to be created when instantiating
+	 * {@link TableEnvironment}.
+	 */
+	private final String builtInCatalogName;
+
+	/**
+	 * Specifies the name of the default database in the initial catalog to be created when
+	 * instantiating {@link TableEnvironment}.
+	 */
+	private final String builtInDatabaseName;
+
+	/**
+	 * Determines if the table environment should work in a batch ({@code true}) or
+	 * streaming ({@code false}) mode.
+	 */
+	private final boolean isBatchMode;
+
+	private EnvironmentSettings(
+			@Nullable String plannerClass,
+			@Nullable String executorClass,
+			String builtInCatalogName,
+			String builtInDatabaseName,
+			boolean isBatchMode) {
+		this.plannerClass = plannerClass;
+		this.executorClass = executorClass;
+		this.builtInCatalogName = builtInCatalogName;
+		this.builtInDatabaseName = builtInDatabaseName;
+		this.isBatchMode = isBatchMode;
+	}
+
+	/**
+	 * Creates a builder for creating an instance of {@link EnvironmentSettings}.
+	 *
+	 * <p>By default, it does not specify a required planner and will use the one that is available
+	 * on the classpath via discovery.
+	 */
+	public static Builder newInstance() {
+		return new Builder();
+	}
+
+	/**
+	 * Gets the specified name of the initial catalog to be created when instantiating
+	 * a {@link TableEnvironment}.
+	 */
+	public String getBuiltInCatalogName() {
+		return builtInCatalogName;
+	}
+
+	/**
+	 * Gets the specified name of the default database in the initial catalog to be created when instantiating
+	 * a {@link TableEnvironment}.
+	 */
+	public String getBuiltInDatabaseName() {
+		return builtInDatabaseName;
+	}
+
+	@Internal
+	public Map<String, String> toPlannerProperties() {
+		Map<String, String> properties = new HashMap<>(toCommonProperties());
+		if (plannerClass != null) {
+			properties.put(CLASS_NAME, plannerClass);
+		}
+		return properties;
+	}
+
+	@Internal
+	public Map<String, String> toExecutorProperties() {
+		Map<String, String> properties = new HashMap<>(toCommonProperties());
+		if (executorClass != null) {
+			properties.put(CLASS_NAME, executorClass);
+		}
+		return properties;
+	}
+
+	private Map<String, String> toCommonProperties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(BATCH_MODE, Boolean.toString(isBatchMode));
+		return properties;
+	}
+
+	/**
+	 * A builder for {@link EnvironmentSettings}.
+	 */
+	public static class Builder {
+		private String plannerClass = null;
+		private String executorClass = null;
+		private String builtInCatalogName = "default_catalog";
+		private String builtInDatabaseName = "default_database";
+		private boolean isBatchMode = false;
+
+		/**
+		 * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
+		 * enabled.
+		 */
+		public Builder useOldPlanner() {
+			this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory";
+			this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory";
+			return this;
+		}
+
+		/**
+		 * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is
+		 * enabled.
+		 */
+		public Builder useBlinkPlanner() {
+			throw new UnsupportedOperationException("The Blink planner is not supported yet.");
+		}
+
+		/**
+		 * Does not set a planner requirement explicitly.
+		 *
+		 * <p>A planner will be discovered automatically, if there is only one planner available.
+		 *
+		 * <p>This is the default behavior.
+		 */
+		public Builder useAnyPlanner() {
+			this.plannerClass = null;
+			this.executorClass = null;
+			return this;
+		}
+
+		/**
+		 * Sets that the components should work in a batch mode. Streaming mode by default.
+		 */
+		public Builder inBatchMode() {
+			this.isBatchMode = true;
+			return this;
+		}
+
+		/**
+		 * Sets that the components should work in a streaming mode. Enabled by default.
+		 */
+		public Builder inStreamingMode() {
+			this.isBatchMode = false;
+			return this;
+		}
+
+		/**
+		 * Specifies the name of the initial catalog to be created when instantiating
+		 * a {@link TableEnvironment}. Default: "default_catalog".
+		 */
+		public Builder withBuiltInCatalogName(String builtInCatalogName) {
+			this.builtInCatalogName = builtInCatalogName;
+			return this;
+		}
+
+		/**
+		 * Specifies the name of the default database in the initial catalog to be created when instantiating
+		 * a {@link TableEnvironment}. Default: "default_database".
+		 */
+		public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
+			this.builtInDatabaseName = builtInDatabaseName;
+			return this;
+		}
+
+		/**
+		 * Returns an immutable instance of {@link EnvironmentSettings}.
+		 */
+		public EnvironmentSettings build() {
+			return new EnvironmentSettings(
+				plannerClass,
+				executorClass,
+				builtInCatalogName,
+				builtInDatabaseName,
+				isBatchMode);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 8e5ba8a..325732f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -59,18 +59,6 @@ public class TableConfig {
 	private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
 	/**
-	 * Specifies the name of the initial catalog to be created when instantiating
-	 * TableEnvironment.
-	 */
-	private String builtInCatalogName = "default_catalog";
-
-	/**
-	 * Specifies the name of the default database in the initial catalog to be created when instantiating
-	 * TableEnvironment.
-	 */
-	private String builtInDatabaseName = "default_database";
-
-	/**
 	 * Returns the timezone for date/time/timestamp conversions.
 	 */
 	public TimeZone getTimeZone() {
@@ -147,38 +135,6 @@ public class TableConfig {
 		this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
 	}
 
-	/**
-	 * Gets the specified name of the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}.
-	 */
-	public String getBuiltInCatalogName() {
-		return builtInCatalogName;
-	}
-
-	/**
-	 * Specifies the name of the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
-	 */
-	public void setBuiltInCatalogName(String builtInCatalogName) {
-		this.builtInCatalogName = builtInCatalogName;
-	}
-
-	/**
-	 * Gets the specified name of the default database in the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}.
-	 */
-	public String getBuiltInDatabaseName() {
-		return builtInDatabaseName;
-	}
-
-	/**
-	 * Specifies the name of the default database in the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
-	 */
-	public void setBuiltInDatabaseName(String builtInDatabaseName) {
-		this.builtInDatabaseName = builtInDatabaseName;
-	}
-
 	public static TableConfig getDefault() {
 		return new TableConfig();
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index b2b50a0..85d20f6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -31,21 +31,23 @@ import org.apache.flink.table.sources.TableSource;
 import java.util.Optional;
 
 /**
- * The base class for batch and stream TableEnvironments.
+ * A table environment is the base class, entry point, and central context for creating Table & SQL
+ * API programs.
  *
- * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is
- * responsible for:
+ * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction
+ * between Scala and Java API) and for bounded and unbounded data processing.
  *
+ * <p>A table environment is responsible for:
  * <ul>
- *     <li>Registering a Table in the internal catalog</li>
- *     <li>Registering an external catalog</li>
- *     <li>Executing SQL queries</li>
- *     <li>Registering a user-defined scalar function. For the user-defined table and aggregate
- *     function, use the StreamTableEnvironment or BatchTableEnvironment</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
  *
- * <p>This environment is unified both on a language level (for all JVM-based languages, i.e. no distinction between
- * Scala and Java API) and for bounded and unbounded data processing.
+ * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to
+ * other Flink APIs, it might be necessary to use one of the available language-specific table environments
+ * in the corresponding bridging modules.
  */
 @PublicEvolving
 public interface TableEnvironment {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 40849aa..727727a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -98,8 +98,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		this.tableConfig = tableConfig;
 		this.tableConfig.addPlannerConfig(queryConfigProvider);
-		this.defaultCatalogName = tableConfig.getBuiltInCatalogName();
-		this.defaultDatabaseName = tableConfig.getBuiltInDatabaseName();
+		this.defaultCatalogName = catalogManager.getCurrentCatalog();
+		this.defaultDatabaseName = catalogManager.getCurrentDatabase();
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
new file mode 100644
index 0000000..7de03c8
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.factories.ComponentFactory;
+
+import java.util.Map;
+
+/**
+ * Factory that creates {@link Executor}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
+ * called with a set of normalized properties that describe the desired configuration. Those properties
+ * may include execution configurations such as watermark interval, max parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in batch mode.
+ *
+ * <p><b>Important:</b> The implementations of this interface should also implement method
+ * <pre>{@code public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment);}
+ * </pre> This method will be used when instantiating a {@link org.apache.flink.table.api.TableEnvironment} from a
+ * bridging module which enables conversion from/to {@code DataStream} API and requires a pre configured
+ * {@code StreamTableEnvironment}.
+ */
+@Internal
+public interface ExecutorFactory extends ComponentFactory {
+
+	/**
+	 * Creates a corresponding {@link Executor}.
+	 *
+	 * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup.
+	 * @return instance of a {@link Executor}
+	 */
+	Executor create(Map<String, String> properties);
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
similarity index 55%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
index fa1128e..0df52d4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
@@ -16,54 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api.internal;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactory;
 
-import java.lang.reflect.Constructor;
+import java.util.Map;
 
 /**
- * Factory to construct a {@link Planner}. It will look for the planner on the classpath.
+ * Factory that creates {@link Planner}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
+ * called with a set of normalized properties that describe the desired configuration. Those properties
+ * may include execution configurations such as watermark interval, max parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in batch mode.
  */
 @Internal
-public final class PlannerFactory {
+public interface PlannerFactory extends ComponentFactory {
 
 	/**
-	 * Looks up {@link Planner} on the class path via reflection.
+	 * Creates a corresponding {@link Planner}.
 	 *
+	 * @param properties Static properties of the {@link Planner}, the same that were used for factory lookup.
 	 * @param executor The executor required by the planner.
 	 * @param tableConfig The configuration of the planner to use.
 	 * @param functionCatalog The function catalog to look up user defined functions.
 	 * @param catalogManager The catalog manager to look up tables and views.
 	 * @return instance of a {@link Planner}
 	 */
-	public static Planner lookupPlanner(
-			Executor executor,
-			TableConfig tableConfig,
-			FunctionCatalog functionCatalog,
-			CatalogManager catalogManager) {
-		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.planner.StreamPlanner");
-			Constructor con = clazz.getConstructor(
-				Executor.class,
-				TableConfig.class,
-				FunctionCatalog.class,
-				CatalogManager.class);
-
-			return (Planner) con.newInstance(executor, tableConfig, functionCatalog, catalogManager);
-		} catch (Exception e) {
-			throw new TableException(
-				"Could not instantiate the planner. Make sure the planner module is on the classpath",
-				e);
-		}
-	}
-
-	private PlannerFactory() {
-	}
+	Planner create(
+		Map<String, String> properties,
+		Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager);
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
new file mode 100644
index 0000000..45e87235
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A factory interface for components that enables further disambiguating in case
+ * there are multiple matching implementations present.
+ */
+@PublicEvolving
+public interface ComponentFactory extends TableFactory {
+	/**
+	 * Specifies a context of optional parameters that if exist should have the
+	 * given values. This enables further disambiguating if there are multiple
+	 * factories that meet the {@link #requiredContext()} and {@link #supportedProperties()}.
+	 *
+	 * <p><b>NOTE:</b> All the property keys should be included in {@link #supportedProperties()}.
+ 	 *
+	 * @return optional properties to disambiguate factories
+	 */
+	Map<String, String> optionalContext();
+
+	@Override
+	Map<String, String> requiredContext();
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * <p><b>NOTE:</b> All the property keys from {@link #optionalContext()} should also be included.
+	 */
+	@Override
+	List<String> supportedProperties();
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
new file mode 100644
index 0000000..db8e9ba
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.AmbiguousTableFactoryException;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Unified class to search for a {@link ComponentFactory} of provided type and properties. It is similar to
+ * {@link TableFactoryService} but it disambiguates based on {@link ComponentFactory#optionalContext()}.
+ */
+@Internal
+public class ComponentFactoryService {
+
+	/**
+	 * Finds a table factory of the given class and property map. This method enables
+	 * disambiguating multiple matching {@link ComponentFactory}s based on additional
+	 * optional context provided via {@link ComponentFactory#optionalContext()}.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param propertyMap properties that describe the factory configuration
+	 * @param <T> factory class type
+	 * @return the matching factory
+	 */
+	public static <T extends ComponentFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
+		List<T> all = TableFactoryService.findAll(factoryClass, propertyMap);
+
+		List<T> filtered = all.stream().filter(factory -> {
+			Map<String, String> optionalContext = factory.optionalContext();
+			return optionalContext.entrySet().stream().allMatch(entry -> {
+					String property = propertyMap.get(entry.getKey());
+					if (property != null) {
+						return property.equals(entry.getValue());
+					} else {
+						return true;
+					}
+				}
+			);
+		}).collect(Collectors.toList());
+
+		if (filtered.size() > 1) {
+			throw new AmbiguousTableFactoryException(
+				filtered,
+				factoryClass,
+				new ArrayList<>(all),
+				propertyMap
+			);
+		} else if (filtered.isEmpty()) {
+			throw new NoMatchingTableFactoryException(
+				"No factory supports the additional filters.",
+				factoryClass,
+				new ArrayList<>(all),
+				propertyMap);
+		} else {
+			return filtered.get(0);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
new file mode 100644
index 0000000..13e821d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.utils.TestPlannerFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ComponentFactoryService}.
+ */
+public class ComponentFactoryServiceTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testLookingUpAmbiguousPlanners() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(EnvironmentSettings.CLASS_NAME, TestPlannerFactory.class.getCanonicalName());
+		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+		PlannerFactory plannerFactory = ComponentFactoryService.find(PlannerFactory.class, properties);
+
+		assertThat(plannerFactory, instanceOf(TestPlannerFactory.class));
+	}
+
+	@Test
+	public void testLookingUpNonExistentClass() {
+		thrown.expect(NoMatchingTableFactoryException.class);
+		thrown.expectMessage("Reason: No factory supports the additional filters");
+
+		Map<String, String> properties = new HashMap<>();
+		properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass");
+		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+		ComponentFactoryService.find(PlannerFactory.class, properties);
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
new file mode 100644
index 0000000..c4c3804
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactoryServiceTest;
+
+/**
+ * Test {@link Planner} factory used in {@link ComponentFactoryServiceTest}.
+ */
+public class OtherTestPlannerFactory extends TestPlannerFactory {
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
new file mode 100644
index 0000000..1a0f05f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test {@link Planner} factory used in {@link org.apache.flink.table.factories.ComponentFactoryServiceTest}.
+ */
+public class TestPlannerFactory implements PlannerFactory {
+
+	public static final String PLANNER_TYPE_KEY = "planner-type";
+	public static final String PLANNER_TYPE_VALUE = "test-planner";
+
+	@Override
+	public Planner create(
+		Map<String, String> properties, Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager) {
+		return null;
+	}
+
+	@Override
+	public Map<String, String> optionalContext() {
+		HashMap<String, String> map = new HashMap<>();
+		map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return map;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> map = new HashMap<>();
+		map.put(PLANNER_TYPE_KEY, PLANNER_TYPE_VALUE);
+		return map;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.BATCH_MODE);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 77%
copy from flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..e2a6a48 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,7 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.sources.CsvBatchTableSourceFactory
-org.apache.flink.table.sources.CsvAppendTableSourceFactory
-org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.factories.utils.TestPlannerFactory
+org.apache.flink.table.factories.utils.OtherTestPlannerFactory
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 7018af2..fa6b178 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -233,11 +233,12 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
+      val defaultCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
-        tableConfig.getBuiltInCatalogName,
+        "default_catalog",
         new GenericInMemoryCatalog(
-          tableConfig.getBuiltInCatalogName,
-          tableConfig.getBuiltInDatabaseName)
+          defaultCatalog,
+          "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
         .asInstanceOf[BatchTableEnvironment]
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 473522e..3baa5fa 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,28 +17,33 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{TableEnvironment, _}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
 
 /**
-  * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-  * [[DataStream]]s.
+  * This table environment is the entry point and central context for creating Table & SQL
+  * API programs that integrate with the Scala-specific [[DataStream]] API.
   *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  * - explain the AST and execution plan of a [[Table]]
+  * It is unified for bounded and unbounded data processing.
+  *
+  * A stream table environment is responsible for:
+  *
+  * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+  * - Connecting to external systems.
+  * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+  * - Executing SQL statements.
+  * - Offering further configuration options.
+  *
+  * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for pure
+  * table programs.
   */
+@PublicEvolving
 trait StreamTableEnvironment extends TableEnvironment {
 
   /**
@@ -245,43 +250,90 @@ trait StreamTableEnvironment extends TableEnvironment {
 object StreamTableEnvironment {
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
     */
   def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment = {
-    create(executionEnvironment, new TableConfig)
+    create(
+      executionEnvironment,
+      EnvironmentSettings.newInstance().build())
   }
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
-    * @param tableConfig The configuration of the TableEnvironment.
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
+    * @param settings The environment settings used to instantiate the [[TableEnvironment]].
+    */
+  def create(
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings)
+    : StreamTableEnvironment = {
+    StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)
+  }
+
+  /**
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
+    * @param tableConfig The configuration of the [[TableEnvironment]].
+    * @deprecated Use [[create(StreamExecutionEnvironment)]] and
+    *             [[StreamTableEnvironment#getConfig()]] for manipulating the [[TableConfig]].
     */
+  @deprecated
   def create(executionEnvironment: StreamExecutionEnvironment, tableConfig: TableConfig)
     : StreamTableEnvironment = {
 
-    StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment)
+    StreamTableEnvironmentImpl
+      .create(
+        executionEnvironment,
+        EnvironmentSettings.newInstance().build(),
+        tableConfig)
   }
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 33b304d..96b7403 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.table.api.scala.internal
 
-import java.util
-import java.util.{Collections, List => JList}
-
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
@@ -29,18 +26,22 @@ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.{PlannerFactory, TableEnvironmentImpl}
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
-import org.apache.flink.table.delegation.{Executor, Planner}
+import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner, PlannerFactory}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.factories.ComponentFactoryService
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
 import org.apache.flink.table.operations.{OutputConversionModifyOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 
+import java.util
+import java.util.{Collections, List => JList, Map => JMap}
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -234,61 +235,54 @@ class StreamTableEnvironmentImpl (
 
 object StreamTableEnvironmentImpl {
 
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment.
-    */
   def create(
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings,
+      tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
-    val catalogManager = new CatalogManager(
-      tableConfig.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(
-        tableConfig.getBuiltInCatalogName,
-        tableConfig.getBuiltInDatabaseName)
-    )
-    create(catalogManager, tableConfig, executionEnvironment)
-  }
-
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param catalogManager The [[CatalogManager]] to use for storing and looking up [[Table]]s.
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment.
-    */
-  def create(
-      catalogManager: CatalogManager,
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
-    : StreamTableEnvironmentImpl = {
-    val executor = lookupExecutor(executionEnvironment)
+    val executorProperties = settings.toExecutorProperties
+    val plannerProperties = settings.toPlannerProperties
+    val executor = lookupExecutor(executorProperties, executionEnvironment)
     val functionCatalog = new FunctionCatalog(
-      catalogManager.getCurrentCatalog,
-      catalogManager.getCurrentDatabase)
+      settings.getBuiltInCatalogName,
+      settings.getBuiltInDatabaseName)
+    val catalogManager = new CatalogManager(
+      settings.getBuiltInCatalogName,
+      new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+    val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
+      .create(
+        plannerProperties,
+        executor,
+        tableConfig,
+        functionCatalog,
+        catalogManager)
     new StreamTableEnvironmentImpl(
       catalogManager,
       functionCatalog,
       tableConfig,
       executionEnvironment,
-      PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager),
+      planner,
       executor)
   }
 
-  private def lookupExecutor(executionEnvironment: StreamExecutionEnvironment) =
+  private def lookupExecutor(
+      executorProperties: JMap[String, String],
+      executionEnvironment: StreamExecutionEnvironment)
+    :Executor =
     try {
-      val clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory")
-      val createMethod = clazz.getMethod("create", classOf[JStreamExecutionEnvironment])
-      createMethod.invoke(null, executionEnvironment.getWrappedStreamExecutionEnvironment)
+      val executorFactory = ComponentFactoryService
+        .find(classOf[ExecutorFactory], executorProperties)
+      val createMethod = executorFactory.getClass
+        .getMethod(
+          "create",
+          classOf[util.Map[String, String]],
+          classOf[JStreamExecutionEnvironment])
+
+      createMethod
+        .invoke(
+          executorFactory,
+          executorProperties,
+          executionEnvironment.getWrappedStreamExecutionEnvironment)
         .asInstanceOf[Executor]
     } catch {
       case e: Exception =>
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
index 5148839..426e09e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,8 @@ import java.util.List;
 public class StreamExecutor implements Executor {
 	private final StreamExecutionEnvironment executionEnvironment;
 
-	StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
+	@VisibleForTesting
+	public StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
 		this.executionEnvironment = executionEnvironment;
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
similarity index 51%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
index 5534217..dff5590 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
@@ -20,36 +20,59 @@ package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Factory to create an implementation of {@link Executor} to use in a
  * {@link org.apache.flink.table.api.TableEnvironment}. The {@link org.apache.flink.table.api.TableEnvironment}
- * should use {@link #create()} method that does not bind to any particular environment,
+ * should use {@link #create(Map)} method that does not bind to any particular environment,
  * whereas {@link org.apache.flink.table.api.scala.StreamTableEnvironment} should use
- * {@link #create(StreamExecutionEnvironment)} as it is always backed by some {@link StreamExecutionEnvironment}
+ * {@link #create(Map, StreamExecutionEnvironment)} as it is always backed by
+ * some {@link StreamExecutionEnvironment}
  */
 @Internal
-public class ExecutorFactory {
+public class StreamExecutorFactory implements ExecutorFactory {
+
 	/**
-	 * Creates a {@link StreamExecutor} that is backed by given {@link StreamExecutionEnvironment}.
+	 * Creates a corresponding {@link StreamExecutor}.
 	 *
+	 * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup.
 	 * @param executionEnvironment a {@link StreamExecutionEnvironment} to use while executing Table programs.
-	 * @return {@link StreamExecutor}
+	 * @return instance of a {@link Executor}
 	 */
-	public static Executor create(StreamExecutionEnvironment executionEnvironment) {
+	public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) {
 		return new StreamExecutor(executionEnvironment);
 	}
 
-	/**
-	 * Creates a {@link StreamExecutor} that is backed by a default {@link StreamExecutionEnvironment}.
-	 *
-	 * @return {@link StreamExecutor}
-	 */
-	public static Executor create() {
+	@Override
+	public Executor create(Map<String, String> properties) {
 		return new StreamExecutor(StreamExecutionEnvironment.getExecutionEnvironment());
 	}
 
-	private ExecutorFactory() {
+	@Override
+	public Map<String, String> requiredContext() {
+		DescriptorProperties properties = new DescriptorProperties();
+		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		return properties.asMap();
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+	}
+
+	@Override
+	public Map<String, String> optionalContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return context;
 	}
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
new file mode 100644
index 0000000..4efb850
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory to construct a {@link StreamPlanner}.
+ */
+@Internal
+public final class StreamPlannerFactory implements PlannerFactory {
+
+	@Override
+	public Planner create(
+		Map<String, String> properties,
+		Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager) {
+		return new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager);
+	}
+
+	public Map<String, String> optionalContext() {
+		Map<String, String> map = new HashMap<>();
+		map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return map;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		DescriptorProperties properties = new DescriptorProperties();
+
+		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		return properties.asMap();
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..6bd2ea3 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -17,3 +17,5 @@ org.apache.flink.table.sources.CsvBatchTableSourceFactory
 org.apache.flink.table.sources.CsvAppendTableSourceFactory
 org.apache.flink.table.sinks.CsvBatchTableSinkFactory
 org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.planner.StreamPlannerFactory
+org.apache.flink.table.executor.StreamExecutorFactory
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2b51869..efa29ed 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -52,8 +52,8 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  protected val defaultCatalogName: String = config.getBuiltInCatalogName
-  protected val defaultDatabaseName: String = config.getBuiltInDatabaseName
+  protected val defaultCatalogName: String = catalogManager.getCurrentCatalog
+  protected val defaultDatabaseName: String = catalogManager.getCurrentDatabase
 
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index bdb2913..44dc4ef 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.stream
 
-import java.lang.{Integer => JInt, Long => JLong}
-
 import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.scala._
@@ -31,14 +29,19 @@ import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => J
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.utils.StreamTestData
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
+
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
 
+import java.lang.{Integer => JInt, Long => JLong}
+
 class StreamTableEnvironmentTest extends TableTestBase {
 
   @Test
@@ -200,12 +203,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val jStreamExecEnv = mock(classOf[JStreamExecEnv])
     when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
     val config = new TableConfig
-    val jTEnv = JStreamTableEnvironmentImpl.create(
-      new CatalogManager(
-        config.getBuiltInCatalogName,
-        new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName)),
+    val manager: CatalogManager = new CatalogManager(
+      "default_catalog",
+      new GenericInMemoryCatalog("default_catalog", "default_database"))
+    val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
+    val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+    val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
+    val jTEnv = new JStreamTableEnvironmentImpl(
+      manager,
+      functionCatalog,
       config,
-      jStreamExecEnv)
+      jStreamExecEnv,
+      streamPlanner,
+      executor)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 2ca6ee3..e72225d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.utils
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
@@ -32,12 +30,16 @@ import org.apache.flink.table.api.java.internal.{BatchTableEnvironmentImpl => Ja
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl => ScalaBatchTableEnvironmentImpl, StreamTableEnvironmentImpl => ScalaStreamTableEnvironmentImpl}
 import org.apache.flink.table.api.{Table, TableConfig, TableSchema}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{ComparisonFailure, Rule}
@@ -137,25 +139,11 @@ object TableTestUtil {
 
   val ANY_SUBTREE = "%ANY_SUBTREE%"
 
-  /**
-    * Creates a [[CatalogManager]] with a builtin default catalog & database set to values
-    * specified in the [[TableConfig]].
-    */
-  def createCatalogManager(config: TableConfig): CatalogManager = {
+  def createCatalogManager(): CatalogManager = {
+    val defaultCatalog = "default_catalog"
     new CatalogManager(
-      config.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName))
-  }
-
-  /**
-    * Sets the configuration of the builtin catalog & databases in [[TableConfig]]
-    * to the current catalog & database of the given [[CatalogManager]]. This should be used
-    * to ensure sanity of a [[org.apache.flink.table.api.TableEnvironment]].
-    */
-  def extractBuiltinPath(config: TableConfig, catalogManager: CatalogManager): TableConfig = {
-    config.setBuiltInCatalogName(catalogManager.getCurrentCatalog)
-    config.setBuiltInDatabaseName(catalogManager.getCurrentDatabase)
-    config
+      defaultCatalog,
+      new GenericInMemoryCatalog(defaultCatalog, "default_database"))
   }
 
   private[utils] def toRelNode(expected: Table) = {
@@ -239,22 +227,15 @@ case class BatchTableTestUtil(
   extends TableTestUtil {
   val javaEnv = new LocalEnvironment()
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
-
   val javaTableEnv = new JavaBatchTableEnvironmentImpl(
     javaEnv,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
   val env = new ExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaBatchTableEnvironmentImpl(
     env,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
 
   def addTable[T: TypeInformation](
       name: String,
@@ -344,22 +325,29 @@ case class StreamTableTestUtil(
   val javaEnv = new LocalStreamEnvironment()
   javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
+  private val tableConfig = new TableConfig
+  private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
+  private val executor: StreamExecutor = new StreamExecutor(javaEnv)
+  private val functionCatalog =
+    new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+  private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
 
-  val javaTableEnv = JavaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val javaTableEnv = new JavaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    javaEnv)
+    javaEnv,
+    streamPlanner,
+    executor)
+
   val env = new StreamExecutionEnvironment(javaEnv)
-  val tableEnv = ScalaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val tableEnv = new ScalaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    env)
+    env,
+    streamPlanner,
+    executor)
 
   def addTable[T: TypeInformation](
       name: String,