You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/02 14:25:41 UTC

[flink] branch master updated (9fa61aa -> 95944e2)

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9fa61aa  [FLINK-13029][table-planner] Removed usages of ExpressionBridge in QueryOperation's factories
     new 1a224b3  [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService
     new 95944e2  [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-python/pyflink/table/table_config.py         |  36 ----
 .../table/tests/test_table_environment_api.py      |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.java     | 155 +++++++++-----
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java       | 228 +++++++++++++++++++++
 .../org/apache/flink/table/api/TableConfig.java    |  44 ----
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java    |  50 +++++
 .../internal => delegation}/PlannerFactory.java    |  49 ++---
 .../flink/table/factories/ComponentFactory.java}   |  32 ++-
 .../table/factories/ComponentFactoryService.java   |  80 ++++++++
 .../factories/ComponentFactoryServiceTest.java     |  68 ++++++
 .../factories/utils/OtherTestPlannerFactory.java   |  12 +-
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   3 +-
 .../table/api/scala/BatchTableEnvironment.scala    |   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 ++++++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  90 ++++----
 .../table/api/AmbiguousTableFactoryException.java  |  36 ++--
 .../apache/flink/table/factories/TableFactory.java |   1 -
 .../flink/table/factories/TableFactoryService.java | 146 ++++++++-----
 .../flink/table/executor/StreamExecutor.java       |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala    |   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |  26 ++-
 .../factories/TableFormatFactoryServiceTest.scala  |   7 -
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++----
 33 files changed, 1115 insertions(+), 480 deletions(-)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
 rename flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/{api/internal => delegation}/PlannerFactory.java (55%)
 copy flink-table/{flink-table-common/src/main/java/org/apache/flink/table/factories/DeserializationSchemaFactory.java => flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java} (52%)
 create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
 create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
 copy flink-core/src/main/java/org/apache/flink/util/SerializableObject.java => flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java (72%)
 create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
 copy {flink-connectors/flink-connector-kafka-0.10/src/main => flink-table/flink-table-api-java/src/test}/resources/META-INF/services/org.apache.flink.table.factories.TableFactory (86%)
 rename flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/{ExecutorFactory.java => StreamExecutorFactory.java} (51%)
 create mode 100644 flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java


[flink] 01/02: [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1a224b3e0b2e11fcd580edb5f8060f7ffd3c61a7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 19 15:00:02 2019 +0200

    [hotfix][table-common] Enable finding multiple matching TableFactories from TableFactoryService
    
    This commits adds methods TableFactoryService.findAll that return TableFactories even if they are ambiguous based on the requiredContext and supportedProperties. Additionally it fixes minor issues and improves type hanlding.
---
 .../table/api/AmbiguousTableFactoryException.java  |  36 +++--
 .../apache/flink/table/factories/TableFactory.java |   1 -
 .../flink/table/factories/TableFactoryService.java | 146 ++++++++++++++-------
 .../factories/TableFormatFactoryServiceTest.scala  |   7 -
 4 files changed, 117 insertions(+), 73 deletions(-)

diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
index 395d5c98..ea1c72e 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/AmbiguousTableFactoryException.java
@@ -31,20 +31,20 @@ import java.util.stream.Collectors;
 public class AmbiguousTableFactoryException extends RuntimeException {
 
 	// factories that match the properties
-	private final List<TableFactory> matchingFactories;
+	private final List<? extends TableFactory> matchingFactories;
 	// required factory class
-	private final Class<?> factoryClass;
+	private final Class<? extends TableFactory> factoryClass;
 	// all found factories
 	private final List<TableFactory> factories;
 	// properties that describe the configuration
 	private final Map<String, String> properties;
 
 	public AmbiguousTableFactoryException(
-		List<TableFactory> matchingFactories,
-		Class<?> factoryClass,
-		List<TableFactory> factories,
-		Map<String, String> properties,
-		Throwable cause) {
+			List<? extends TableFactory> matchingFactories,
+			Class<? extends TableFactory> factoryClass,
+			List<TableFactory> factories,
+			Map<String, String> properties,
+			Throwable cause) {
 
 		super(cause);
 		this.matchingFactories = matchingFactories;
@@ -54,10 +54,10 @@ public class AmbiguousTableFactoryException extends RuntimeException {
 	}
 
 	public AmbiguousTableFactoryException(
-		List<TableFactory> matchingFactories,
-		Class<?> factoryClass,
-		List<TableFactory> factories,
-		Map<String, String> properties) {
+			List<? extends TableFactory> matchingFactories,
+			Class<? extends TableFactory> factoryClass,
+			List<TableFactory> factories,
+			Map<String, String> properties) {
 
 		this(matchingFactories, factoryClass, factories, properties, null);
 	}
@@ -70,15 +70,13 @@ public class AmbiguousTableFactoryException extends RuntimeException {
 				"The following properties are requested:\n%s\n\n" +
 				"The following factories have been considered:\n%s",
 			factoryClass.getName(),
-			String.join(
-				"\n",
-				matchingFactories.stream()
-					.map(p -> p.getClass().getName()).collect(Collectors.toList())),
+			matchingFactories.stream()
+				.map(p -> p.getClass().getName())
+				.collect(Collectors.joining("\n")),
 			DescriptorProperties.toString(properties),
-			String.join(
-				"\n",
-				factories.stream().map(p -> p.getClass().getName()).collect(Collectors.toList())
-			)
+			factories.stream()
+				.map(p -> p.getClass().getName())
+				.collect(Collectors.joining("\n"))
 		);
 	}
 }
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
index b56ffa1..d4c0628 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactory.java
@@ -55,7 +55,6 @@ public interface TableFactory {
 	 */
 	Map<String, String> requiredContext();
 
-
 	/**
 	 * List of property keys that this factory can handle. This method will be used for validation.
 	 * If a property is passed that this factory cannot handle, an exception will be thrown. The
diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
index ba6eec4..082c37d 100644
--- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
+++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/TableFactoryService.java
@@ -62,9 +62,9 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Descriptor descriptor) {
+	public static <T extends TableFactory> T find(Class<T> factoryClass, Descriptor descriptor) {
 		Preconditions.checkNotNull(descriptor);
-		return findInternal(factoryClass, descriptor.toProperties(), Optional.empty());
+		return findSingleInternal(factoryClass, descriptor.toProperties(), Optional.empty());
 	}
 
 	/**
@@ -76,10 +76,13 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Descriptor descriptor, ClassLoader classLoader) {
+	public static <T extends TableFactory> T find(
+			Class<T> factoryClass,
+			Descriptor descriptor,
+			ClassLoader classLoader) {
 		Preconditions.checkNotNull(descriptor);
 		Preconditions.checkNotNull(classLoader);
-		return findInternal(factoryClass, descriptor.toProperties(), Optional.of(classLoader));
+		return findSingleInternal(factoryClass, descriptor.toProperties(), Optional.of(classLoader));
 	}
 
 	/**
@@ -90,8 +93,8 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
-		return findInternal(factoryClass, propertyMap, Optional.empty());
+	public static <T extends TableFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
+		return findSingleInternal(factoryClass, propertyMap, Optional.empty());
 	}
 
 	/**
@@ -103,9 +106,52 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T find(Class<T> factoryClass, Map<String, String> propertyMap, ClassLoader classLoader) {
+	public static <T extends TableFactory> T find(
+			Class<T> factoryClass,
+			Map<String, String> propertyMap,
+			ClassLoader classLoader) {
 		Preconditions.checkNotNull(classLoader);
-		return findInternal(factoryClass, propertyMap, Optional.of(classLoader));
+		return findSingleInternal(factoryClass, propertyMap, Optional.of(classLoader));
+	}
+
+	/**
+	 * Finds all table factories of the given class and property map.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param propertyMap properties that describe the factory configuration
+	 * @param <T> factory class type
+	 * @return all the matching factories
+	 */
+	public static <T extends TableFactory> List<T> findAll(Class<T> factoryClass, Map<String, String> propertyMap) {
+		return findAllInternal(factoryClass, propertyMap, Optional.empty());
+	}
+
+	/**
+	 * Finds a table factory of the given class, property map, and classloader.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param properties properties that describe the factory configuration
+	 * @param classLoader classloader for service loading
+	 * @param <T> factory class type
+	 * @return the matching factory
+	 */
+	private static <T extends TableFactory> T findSingleInternal(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			Optional<ClassLoader> classLoader) {
+
+		List<TableFactory> tableFactories = discoverFactories(classLoader);
+		List<T> filtered = filter(tableFactories, factoryClass, properties);
+
+		if (filtered.size() > 1) {
+			throw new AmbiguousTableFactoryException(
+				filtered,
+				factoryClass,
+				tableFactories,
+				properties);
+		} else {
+			return filtered.get(0);
+		}
 	}
 
 	/**
@@ -117,19 +163,32 @@ public class TableFactoryService {
 	 * @param <T> factory class type
 	 * @return the matching factory
 	 */
-	public static <T> T findInternal(Class<T> factoryClass, Map<String, String> properties, Optional<ClassLoader> classLoader) {
+	private static <T extends TableFactory> List<T> findAllInternal(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			Optional<ClassLoader> classLoader) {
+
+		List<TableFactory> tableFactories = discoverFactories(classLoader);
+		return filter(tableFactories, factoryClass, properties);
+	}
+
+	/**
+	 * Filters found factories by factory class and with matching context.
+	 */
+	private static <T extends TableFactory> List<T> filter(
+			List<TableFactory> foundFactories,
+			Class<T> factoryClass,
+			Map<String, String> properties) {
 
 		Preconditions.checkNotNull(factoryClass);
 		Preconditions.checkNotNull(properties);
 
-		List<TableFactory> foundFactories = discoverFactories(classLoader);
-
-		List<TableFactory> classFactories = filterByFactoryClass(
+		List<T> classFactories = filterByFactoryClass(
 			factoryClass,
 			properties,
 			foundFactories);
 
-		List<TableFactory> contextFactories = filterByContext(
+		List<T> contextFactories = filterByContext(
 			factoryClass,
 			properties,
 			foundFactories,
@@ -169,10 +228,11 @@ public class TableFactoryService {
 	/**
 	 * Filters factories with matching context by factory class.
 	 */
-	private static <T> List<TableFactory> filterByFactoryClass(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories) {
+	@SuppressWarnings("unchecked")
+	private static <T> List<T> filterByFactoryClass(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories) {
 
 		List<TableFactory> classFactories = foundFactories.stream()
 			.filter(p -> factoryClass.isAssignableFrom(p.getClass()))
@@ -186,7 +246,7 @@ public class TableFactoryService {
 				properties);
 		}
 
-		return classFactories;
+		return (List<T>) classFactories;
 	}
 
 	/**
@@ -194,13 +254,13 @@ public class TableFactoryService {
 	 *
 	 * @return all matching factories
 	 */
-	private static <T> List<TableFactory> filterByContext(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories,
-		List<TableFactory> classFactories) {
+	private static <T extends TableFactory> List<T> filterByContext(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories,
+			List<T> classFactories) {
 
-		List<TableFactory> matchingFactories = classFactories.stream().filter(factory -> {
+		List<T> matchingFactories = classFactories.stream().filter(factory -> {
 			Map<String, String> requestedContext = normalizeContext(factory);
 
 			Map<String, String> plainContext = new HashMap<>(requestedContext);
@@ -214,7 +274,9 @@ public class TableFactoryService {
 			plainContext.remove(org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_PROPERTY_VERSION);
 
 			// check if required context is met
-			return plainContext.keySet().stream().allMatch(e -> properties.containsKey(e) && properties.get(e).equals(plainContext.get(e)));
+			return plainContext.keySet()
+				.stream()
+				.allMatch(e -> properties.containsKey(e) && properties.get(e).equals(plainContext.get(e)));
 		}).collect(Collectors.toList());
 
 		if (matchingFactories.isEmpty()) {
@@ -238,17 +300,17 @@ public class TableFactoryService {
 				String.format("Required context of factory '%s' must not be null.", factory.getClass().getName()));
 		}
 		return requiredContext.keySet().stream()
-			.collect(Collectors.toMap(key -> key.toLowerCase(), key -> requiredContext.get(key)));
+			.collect(Collectors.toMap(String::toLowerCase, requiredContext::get));
 	}
 
 	/**
 	 * Filters the matching class factories by supported properties.
 	 */
-	private static <T> T filterBySupportedProperties(
-		Class<T> factoryClass,
-		Map<String, String> properties,
-		List<TableFactory> foundFactories,
-		List<TableFactory> classFactories) {
+	private static <T extends TableFactory> List<T> filterBySupportedProperties(
+			Class<T> factoryClass,
+			Map<String, String> properties,
+			List<TableFactory> foundFactories,
+			List<T> classFactories) {
 
 		final List<String> plainGivenKeys = new LinkedList<>();
 		properties.keySet().forEach(k -> {
@@ -261,8 +323,8 @@ public class TableFactoryService {
 		});
 
 		Optional<String> lastKey = Optional.empty();
-		List<TableFactory> supportedFactories = new LinkedList<>();
-		for (TableFactory factory: classFactories) {
+		List<T> supportedFactories = new LinkedList<>();
+		for (T factory: classFactories) {
 			Set<String> requiredContextKeys = normalizeContext(factory).keySet();
 			Tuple2<List<String>, List<String>> tuple2 = normalizeSupportedProperties(factory);
 			// ignore context keys
@@ -273,10 +335,10 @@ public class TableFactoryService {
 				factory,
 				givenContextFreeKeys);
 
-			Boolean allTrue = true;
+			boolean allTrue = true;
 			for (String k: givenFilteredKeys) {
 				lastKey = Optional.of(k);
-				if (!(tuple2.f0.contains(k) || tuple2.f1.stream().anyMatch(p -> k.startsWith(p)))) {
+				if (!(tuple2.f0.contains(k) || tuple2.f1.stream().anyMatch(k::startsWith))) {
 					allTrue = false;
 					break;
 				}
@@ -310,15 +372,9 @@ public class TableFactoryService {
 				factoryClass,
 				foundFactories,
 				properties);
-		} else if (supportedFactories.size() > 1) {
-			throw new AmbiguousTableFactoryException(
-				supportedFactories,
-				factoryClass,
-				foundFactories,
-				properties);
 		}
 
-		return (T) supportedFactories.get(0);
+		return supportedFactories;
 	}
 
 	/**
@@ -332,7 +388,7 @@ public class TableFactoryService {
 					factory.getClass().getName()));
 		}
 		List<String> supportedKeys = supportedProperties.stream()
-			.map(p -> p.toLowerCase())
+			.map(String::toLowerCase)
 			.collect(Collectors.toList());
 
 		// extract wildcard prefixes
@@ -353,9 +409,7 @@ public class TableFactoryService {
 	/**
 	 * Performs filtering for special cases (i.e. table format factories with schema derivation).
 	 */
-	private static List<String> filterSupportedPropertiesFactorySpecific(
-		TableFactory factory,
-		List<String> keys) {
+	private static List<String> filterSupportedPropertiesFactorySpecific(TableFactory factory, List<String> keys) {
 
 		if (factory instanceof TableFormatFactory) {
 			boolean includeSchema = ((TableFormatFactory) factory).supportsSchemaDerivation();
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
index ab5a051..6b695c7 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/factories/TableFormatFactoryServiceTest.scala
@@ -94,13 +94,6 @@ class TableFormatFactoryServiceTest {
   }
 
   @Test(expected = classOf[NoMatchingTableFactoryException])
-  def testMissingClass(): Unit = {
-    val props = properties()
-    // this class is not a valid factory
-    TableFactoryService.find(classOf[TableFormatFactoryServiceTest], props)
-  }
-
-  @Test(expected = classOf[NoMatchingTableFactoryException])
   def testInvalidContext(): Unit = {
     val props = properties()
     // no context specifies this


[flink] 02/02: [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor

Posted by tw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 95944e231315f085d5e23717332aa2866caa5d8a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Jun 19 15:00:23 2019 +0200

    [FLINK-12798][table] Add a discovery mechanism for switching between Flink/Blink Planner/Executor
    
    This closes #8852.
---
 flink-python/pyflink/table/table_config.py         |  36 ----
 .../table/tests/test_table_environment_api.py      |  12 --
 .../client/gateway/local/ExecutionContextTest.java |   3 +-
 .../client/gateway/local/LocalExecutorITCase.java  |   7 +-
 .../table/api/java/BatchTableEnvironment.java      |   5 +-
 .../table/api/java/StreamTableEnvironment.java     | 155 +++++++++-----
 .../java/internal/StreamTableEnvironmentImpl.java  |  66 +++---
 .../flink/table/api/EnvironmentSettings.java       | 228 +++++++++++++++++++++
 .../org/apache/flink/table/api/TableConfig.java    |  44 ----
 .../apache/flink/table/api/TableEnvironment.java   |  22 +-
 .../table/api/internal/TableEnvironmentImpl.java   |   4 +-
 .../flink/table/delegation/ExecutorFactory.java    |  50 +++++
 .../internal => delegation}/PlannerFactory.java    |  49 ++---
 .../flink/table/factories/ComponentFactory.java    |  53 +++++
 .../table/factories/ComponentFactoryService.java   |  80 ++++++++
 .../factories/ComponentFactoryServiceTest.java     |  68 ++++++
 .../factories/utils/OtherTestPlannerFactory.java   |  28 +++
 .../table/factories/utils/TestPlannerFactory.java  |  69 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   6 +-
 .../table/api/scala/BatchTableEnvironment.scala    |   7 +-
 .../table/api/scala/StreamTableEnvironment.scala   | 132 ++++++++----
 .../internal/StreamTableEnvironmentImpl.scala      |  90 ++++----
 .../flink/table/executor/StreamExecutor.java       |   4 +-
 ...utorFactory.java => StreamExecutorFactory.java} |  49 +++--
 .../flink/table/planner/StreamPlannerFactory.java  |  70 +++++++
 .../org.apache.flink.table.factories.TableFactory  |   2 +
 .../flink/table/api/internal/TableEnvImpl.scala    |   4 +-
 .../api/stream/StreamTableEnvironmentTest.scala    |  26 ++-
 .../apache/flink/table/utils/TableTestBase.scala   |  76 +++----
 29 files changed, 1052 insertions(+), 393 deletions(-)

diff --git a/flink-python/pyflink/table/table_config.py b/flink-python/pyflink/table/table_config.py
index 7eb3513..d6b5864 100644
--- a/flink-python/pyflink/table/table_config.py
+++ b/flink-python/pyflink/table/table_config.py
@@ -90,39 +90,3 @@ class TableConfig(object):
             self._j_table_config.setMaxGeneratedCodeLength(max_generated_code_length)
         else:
             raise Exception("TableConfig.max_generated_code_length should be a int value!")
-
-    def get_built_in_catalog_name(self):
-        """
-        Gets the specified name of the initial catalog to be created when instantiating
-        :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInCatalogName()
-
-    def set_built_in_catalog_name(self, built_in_catalog_name):
-        """
-        Specifies the name of the initial catalog to be created when instantiating
-        :class:`TableEnvironment`. This method has no effect if called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_catalog_name is not None and isinstance(built_in_catalog_name, str):
-            self._j_table_config.setBuiltInCatalogName(built_in_catalog_name)
-        else:
-            raise Exception("TableConfig.built_in_catalog_name should be a string value!")
-
-    def get_built_in_database_name(self):
-        """
-        Gets the specified name of the default database in the initial catalog to be created when
-        instantiating :class:`TableEnvironment`.
-        """
-        return self._j_table_config.getBuiltInDatabaseName()
-
-    def set_built_in_database_name(self, built_in_database_name):
-        """
-        Specifies the name of the default database in the initial catalog to be created when
-        instantiating :class:`TableEnvironment`. This method has no effect if called on the
-        :func:`~pyflink.table.TableEnvironment.get_config`.
-        """
-        if built_in_database_name is not None and isinstance(built_in_database_name, str):
-            self._j_table_config.setBuiltInDatabaseName(built_in_database_name)
-        else:
-            raise Exception("TableConfig.built_in_database_name should be a string value!")
diff --git a/flink-python/pyflink/table/tests/test_table_environment_api.py b/flink-python/pyflink/table/tests/test_table_environment_api.py
index a129d22..4eba1bb 100644
--- a/flink-python/pyflink/table/tests/test_table_environment_api.py
+++ b/flink-python/pyflink/table/tests/test_table_environment_api.py
@@ -170,8 +170,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = StreamExecutionEnvironment.get_execution_environment()
         t_env = StreamTableEnvironment.create(env, table_config)
@@ -181,8 +179,6 @@ class StreamTableEnvironmentTests(PyFlinkStreamTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), "test_database")
 
 
 class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
@@ -208,22 +204,16 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         table_config.set_timezone("Asia/Shanghai")
         table_config.set_max_generated_code_length(64000)
         table_config.set_null_check(True)
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         self.assertTrue(table_config.get_null_check())
         self.assertEqual(table_config.get_max_generated_code_length(), 64000)
         self.assertEqual(table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(table_config.get_built_in_database_name(), "test_database")
 
     def test_create_table_environment(self):
         table_config = TableConfig()
         table_config.set_max_generated_code_length(32000)
         table_config.set_null_check(False)
         table_config.set_timezone("Asia/Shanghai")
-        table_config.set_built_in_catalog_name("test_catalog")
-        table_config.set_built_in_database_name("test_database")
 
         env = ExecutionEnvironment.get_execution_environment()
         t_env = BatchTableEnvironment.create(env, table_config)
@@ -233,5 +223,3 @@ class BatchTableEnvironmentTests(PyFlinkBatchTableTestCase):
         self.assertFalse(readed_table_config.get_null_check())
         self.assertEqual(readed_table_config.get_max_generated_code_length(), 32000)
         self.assertEqual(readed_table_config.get_timezone(), "Asia/Shanghai")
-        self.assertEqual(readed_table_config.get_built_in_catalog_name(), "test_catalog")
-        self.assertEqual(readed_table_config.get_built_in_database_name(), "test_database")
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index ac60fcc..fb0c80c 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
@@ -110,7 +109,7 @@ public class ExecutionContextTest {
 		assertEquals(
 			new HashSet<>(
 				Arrays.asList(
-					TableConfig.getDefault().getBuiltInCatalogName(),
+					"default_catalog",
 					inmemoryCatalog,
 					hiveCatalog,
 					hiveDefaultVersionCatalog,
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 4f92ae7..023d656 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.client.config.Environment;
 import org.apache.flink.table.client.config.entries.ViewEntry;
@@ -76,7 +75,6 @@ import static org.junit.Assert.fail;
 public class LocalExecutorITCase extends TestLogger {
 
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
-
 	private static final int NUM_TMS = 2;
 	private static final int NUM_SLOTS_PER_TM = 2;
 
@@ -158,7 +156,7 @@ public class LocalExecutorITCase extends TestLogger {
 		final List<String> actualCatalogs = executor.listCatalogs(session);
 
 		final List<String> expectedCatalogs = Arrays.asList(
-			TableConfig.getDefault().getBuiltInCatalogName(),
+			"default_catalog",
 			"catalog1");
 		assertEquals(expectedCatalogs, actualCatalogs);
 	}
@@ -170,8 +168,7 @@ public class LocalExecutorITCase extends TestLogger {
 
 		final List<String> actualDatabases = executor.listDatabases(session);
 
-		final List<String> expectedDatabases = Arrays.asList(
-			TableConfig.getDefault().getBuiltInDatabaseName());
+		final List<String> expectedDatabases = Arrays.asList("default_database");
 		assertEquals(expectedDatabases, actualDatabases);
 	}
 
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
index 1759b60..16f63af 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/BatchTableEnvironment.java
@@ -287,9 +287,10 @@ public interface BatchTableEnvironment extends TableEnvironment {
 		try {
 			Class<?> clazz = Class.forName("org.apache.flink.table.api.java.internal.BatchTableEnvironmentImpl");
 			Constructor con = clazz.getConstructor(ExecutionEnvironment.class, TableConfig.class, CatalogManager.class);
+			String defaultCatalog = "default_catalog";
 			CatalogManager catalogManager = new CatalogManager(
-				tableConfig.getBuiltInCatalogName(),
-				new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName())
+				defaultCatalog,
+				new GenericInMemoryCatalog(defaultCatalog, "default_database")
 			);
 			return (BatchTableEnvironment) con.newInstance(executionEnvironment, tableConfig, catalogManager);
 		} catch (Throwable t) {
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
index a4f5df2..6859780 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/StreamTableEnvironment.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
@@ -35,24 +36,115 @@ import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 
 /**
- * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
- * {@link DataStream}s.
+ * This table environment is the entry point and central context for creating Table & SQL
+ * API programs that integrate with the Java-specific {@link DataStream} API.
  *
- * <p>A TableEnvironment can be used to:
+ * <p>It is unified for bounded and unbounded data processing.
+ *
+ * <p>A stream table environment is responsible for:
  * <ul>
- *     <li>convert a {@link DataStream} to a {@link Table}</li>
- *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
- *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
- *     <li>scan a registered table to obtain a {@link Table}</li>
- *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
- *     <li>convert a {@link Table} into a {@link DataStream}</li>
- *     <li>explain the AST and execution plan of a {@link Table}</li>
+ *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
+ *
+ * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+ * for pure table programs.
  */
 @PublicEvolving
 public interface StreamTableEnvironment extends TableEnvironment {
 
 	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 */
+	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
+		return create(
+			executionEnvironment,
+			EnvironmentSettings.newInstance().build());
+	}
+
+	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 * @param settings The environment settings used to instantiate the {@link TableEnvironment}.
+	 */
+	static StreamTableEnvironment create(
+			StreamExecutionEnvironment executionEnvironment,
+			EnvironmentSettings settings) {
+		return StreamTableEnvironmentImpl.create(
+			executionEnvironment,
+			settings,
+			new TableConfig()
+		);
+	}
+
+	/**
+	 * Creates a table environment that is the entry point and central context for creating Table & SQL
+	 * API programs that integrate with the Java-specific {@link DataStream} API.
+	 *
+	 * <p>It is unified for bounded and unbounded data processing.
+	 *
+	 * <p>A stream table environment is responsible for:
+	 * <ul>
+	 *     <li>Convert a {@link DataStream} into {@link Table} and vice-versa.</li>
+	 *     <li>Connecting to external systems.</li>
+	 *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+	 *     <li>Executing SQL statements.</li>
+	 *     <li>Offering further configuration options.</li>
+	 * </ul>
+	 *
+	 * <p>Note: If you don't intend to use the {@link DataStream} API, {@link TableEnvironment} is meant
+	 * for pure table programs.
+	 *
+	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the {@link TableEnvironment}.
+	 * @param tableConfig The configuration of the {@link TableEnvironment}.
+	 * @deprecated Use {@link #create(StreamExecutionEnvironment)} and {@link #getConfig()}
+	 * for manipulating {@link TableConfig}.
+	 */
+	@Deprecated
+	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
+		return StreamTableEnvironmentImpl.create(
+			executionEnvironment,
+			EnvironmentSettings.newInstance().build(),
+			tableConfig);
+	}
+
+	/**
 	 * Registers a {@link TableFunction} under a unique name in the TableEnvironment's catalog.
 	 * Registered functions can be referenced in Table API and SQL queries.
 	 *
@@ -356,47 +448,4 @@ public interface StreamTableEnvironment extends TableEnvironment {
 	 */
 	@Override
 	StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor);
-
-	/**
-	 * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
-	 * {@link DataStream}s.
-	 *
-	 * <p>A TableEnvironment can be used to:
-	 * <ul>
-	 *     <li>convert a {@link DataStream} to a {@link Table}</li>
-	 *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>scan a registered table to obtain a {@link Table}</li>
-	 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
-	 *     <li>convert a {@link Table} into a {@link DataStream}</li>
-	 *     <li>explain the AST and execution plan of a {@link Table}</li>
-	 * </ul>
-	 *
-	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
-	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
-		return create(executionEnvironment, new TableConfig());
-	}
-
-	/**
-	 * The {@link TableEnvironment} for a Java {@link StreamExecutionEnvironment} that works with
-	 * {@link DataStream}s.
-	 *
-	 * <p>A TableEnvironment can be used to:
-	 * <ul>
-	 *     <li>convert a {@link DataStream} to a {@link Table}</li>
-	 *     <li>register a {@link DataStream} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>register a {@link Table} in the {@link TableEnvironment}'s catalog</li>
-	 *     <li>scan a registered table to obtain a {@link Table}</li>
-	 *     <li>specify a SQL query on registered tables to obtain a {@link Table}</li>
-	 *     <li>convert a {@link Table} into a {@link DataStream}</li>
-	 *     <li>explain the AST and execution plan of a {@link Table}</li>
-	 * </ul>
-	 *
-	 * @param executionEnvironment The Java {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 */
-	static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, TableConfig tableConfig) {
-		return StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment);
-	}
 }
diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
index 0ea02a3..6b37690 100644
--- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/java/internal/StreamTableEnvironmentImpl.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.api.java.internal;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -27,24 +28,27 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StreamQueryConfig;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.Types;
 import org.apache.flink.table.api.ValidationException;
-import org.apache.flink.table.api.internal.PlannerFactory;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
 import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.descriptors.ConnectorDescriptor;
 import org.apache.flink.table.descriptors.StreamTableDescriptor;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.ExpressionParser;
+import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
@@ -60,6 +64,7 @@ import org.apache.flink.table.typeutils.FieldInfoUtils;
 import java.lang.reflect.Method;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 /**
@@ -73,7 +78,8 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 
 	private final StreamExecutionEnvironment executionEnvironment;
 
-	private StreamTableEnvironmentImpl(
+	@VisibleForTesting
+	public StreamTableEnvironmentImpl(
 			CatalogManager catalogManager,
 			FunctionCatalog functionCatalog,
 			TableConfig tableConfig,
@@ -92,33 +98,21 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 	 * @param tableConfig The configuration of the TableEnvironment.
 	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
 	 */
-	public static StreamTableEnvironmentImpl create(
-			TableConfig tableConfig,
-			StreamExecutionEnvironment executionEnvironment) {
-		CatalogManager catalogManager = new CatalogManager(
-			tableConfig.getBuiltInCatalogName(),
-			new GenericInMemoryCatalog(tableConfig.getBuiltInCatalogName(), tableConfig.getBuiltInDatabaseName()));
-		return create(catalogManager, tableConfig, executionEnvironment);
-	}
-
-	/**
-	 * Creates an instance of a {@link StreamTableEnvironment}. It uses the {@link StreamExecutionEnvironment} for
-	 * executing queries. This is also the {@link StreamExecutionEnvironment} that will be used when converting
-	 * from/to {@link DataStream}.
-	 *
-	 * @param catalogManager The {@link CatalogManager} to use for storing and looking up {@link Table}s.
-	 * @param tableConfig The configuration of the TableEnvironment.
-	 * @param executionEnvironment The {@link StreamExecutionEnvironment} of the TableEnvironment.
-	 */
-	public static StreamTableEnvironmentImpl create(
-			CatalogManager catalogManager,
-			TableConfig tableConfig,
-			StreamExecutionEnvironment executionEnvironment) {
+	public static StreamTableEnvironment create(
+			StreamExecutionEnvironment executionEnvironment,
+			EnvironmentSettings settings,
+			TableConfig tableConfig) {
 		FunctionCatalog functionCatalog = new FunctionCatalog(
-			catalogManager.getCurrentCatalog(),
-			catalogManager.getCurrentDatabase());
-		Executor executor = lookupExecutor(executionEnvironment);
-		Planner planner = PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager);
+			settings.getBuiltInCatalogName(),
+			settings.getBuiltInDatabaseName());
+		CatalogManager catalogManager = new CatalogManager(
+			settings.getBuiltInCatalogName(),
+			new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()));
+		Map<String, String> plannerProperties = settings.toPlannerProperties();
+		Map<String, String> executorProperties = settings.toExecutorProperties();
+		Executor executor = lookupExecutor(executorProperties, executionEnvironment);
+		Planner planner = ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+			.create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
 		return new StreamTableEnvironmentImpl(
 			catalogManager,
 			functionCatalog,
@@ -129,12 +123,18 @@ public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl imple
 		);
 	}
 
-	private static Executor lookupExecutor(StreamExecutionEnvironment executionEnvironment) {
+	private static Executor lookupExecutor(
+			Map<String, String> executorProperties,
+			StreamExecutionEnvironment executionEnvironment) {
 		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory");
-			Method createMethod = clazz.getMethod("create", StreamExecutionEnvironment.class);
-
-			return (Executor) createMethod.invoke(null, executionEnvironment);
+			ExecutorFactory executorFactory = ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+			Method createMethod = executorFactory.getClass()
+				.getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+			return (Executor) createMethod.invoke(
+				executorFactory,
+				executorProperties,
+				executionEnvironment);
 		} catch (Exception e) {
 			throw new TableException(
 				"Could not instantiate the executor. Make sure a planner module is on the classpath",
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
new file mode 100644
index 0000000..37ba179
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Defines all parameters that initialize a table environment. Those parameters are used only
+ * during instantiation of a {@link TableEnvironment} and cannot be changed afterwards.
+ *
+ * <p>Example:
+ * <pre>{@code
+ *    EnvironmentSettings.newInstance()
+ *      .useOldPlanner()
+ *      .inStreamingMode()
+ *      .withBuiltInCatalogName("default_catalog")
+ *      .withBuiltInDatabaseName("default_database")
+ *      .build()
+ * }</pre>
+ */
+@PublicEvolving
+public class EnvironmentSettings {
+	public static final String BATCH_MODE = "batch-mode";
+	public static final String CLASS_NAME = "class-name";
+
+	/**
+	 * Canonical name of the {@link Planner} class to use.
+	 */
+	private final String plannerClass;
+
+	/**
+	 * Canonical name of the {@link Executor} class to use.
+	 */
+	private final String executorClass;
+
+	/**
+	 * Specifies the name of the initial catalog to be created when instantiating
+	 * {@link TableEnvironment}.
+	 */
+	private final String builtInCatalogName;
+
+	/**
+	 * Specifies the name of the default database in the initial catalog to be created when
+	 * instantiating {@link TableEnvironment}.
+	 */
+	private final String builtInDatabaseName;
+
+	/**
+	 * Determines if the table environment should work in a batch ({@code true}) or
+	 * streaming ({@code false}) mode.
+	 */
+	private final boolean isBatchMode;
+
+	private EnvironmentSettings(
+			@Nullable String plannerClass,
+			@Nullable String executorClass,
+			String builtInCatalogName,
+			String builtInDatabaseName,
+			boolean isBatchMode) {
+		this.plannerClass = plannerClass;
+		this.executorClass = executorClass;
+		this.builtInCatalogName = builtInCatalogName;
+		this.builtInDatabaseName = builtInDatabaseName;
+		this.isBatchMode = isBatchMode;
+	}
+
+	/**
+	 * Creates a builder for creating an instance of {@link EnvironmentSettings}.
+	 *
+	 * <p>By default, it does not specify a required planner and will use the one that is available
+	 * on the classpath via discovery.
+	 */
+	public static Builder newInstance() {
+		return new Builder();
+	}
+
+	/**
+	 * Gets the specified name of the initial catalog to be created when instantiating
+	 * a {@link TableEnvironment}.
+	 */
+	public String getBuiltInCatalogName() {
+		return builtInCatalogName;
+	}
+
+	/**
+	 * Gets the specified name of the default database in the initial catalog to be created when instantiating
+	 * a {@link TableEnvironment}.
+	 */
+	public String getBuiltInDatabaseName() {
+		return builtInDatabaseName;
+	}
+
+	@Internal
+	public Map<String, String> toPlannerProperties() {
+		Map<String, String> properties = new HashMap<>(toCommonProperties());
+		if (plannerClass != null) {
+			properties.put(CLASS_NAME, plannerClass);
+		}
+		return properties;
+	}
+
+	@Internal
+	public Map<String, String> toExecutorProperties() {
+		Map<String, String> properties = new HashMap<>(toCommonProperties());
+		if (executorClass != null) {
+			properties.put(CLASS_NAME, executorClass);
+		}
+		return properties;
+	}
+
+	private Map<String, String> toCommonProperties() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(BATCH_MODE, Boolean.toString(isBatchMode));
+		return properties;
+	}
+
+	/**
+	 * A builder for {@link EnvironmentSettings}.
+	 */
+	public static class Builder {
+		private String plannerClass = null;
+		private String executorClass = null;
+		private String builtInCatalogName = "default_catalog";
+		private String builtInDatabaseName = "default_database";
+		private boolean isBatchMode = false;
+
+		/**
+		 * Sets the old Flink planner as the required module. By default, {@link #useAnyPlanner()} is
+		 * enabled.
+		 */
+		public Builder useOldPlanner() {
+			this.plannerClass = "org.apache.flink.table.planner.StreamPlannerFactory";
+			this.executorClass = "org.apache.flink.table.executor.StreamExecutorFactory";
+			return this;
+		}
+
+		/**
+		 * Sets the Blink planner as the required module. By default, {@link #useAnyPlanner()} is
+		 * enabled.
+		 */
+		public Builder useBlinkPlanner() {
+			throw new UnsupportedOperationException("The Blink planner is not supported yet.");
+		}
+
+		/**
+		 * Does not set a planner requirement explicitly.
+		 *
+		 * <p>A planner will be discovered automatically, if there is only one planner available.
+		 *
+		 * <p>This is the default behavior.
+		 */
+		public Builder useAnyPlanner() {
+			this.plannerClass = null;
+			this.executorClass = null;
+			return this;
+		}
+
+		/**
+		 * Sets that the components should work in a batch mode. Streaming mode by default.
+		 */
+		public Builder inBatchMode() {
+			this.isBatchMode = true;
+			return this;
+		}
+
+		/**
+		 * Sets that the components should work in a streaming mode. Enabled by default.
+		 */
+		public Builder inStreamingMode() {
+			this.isBatchMode = false;
+			return this;
+		}
+
+		/**
+		 * Specifies the name of the initial catalog to be created when instantiating
+		 * a {@link TableEnvironment}. Default: "default_catalog".
+		 */
+		public Builder withBuiltInCatalogName(String builtInCatalogName) {
+			this.builtInCatalogName = builtInCatalogName;
+			return this;
+		}
+
+		/**
+		 * Specifies the name of the default database in the initial catalog to be created when instantiating
+		 * a {@link TableEnvironment}. Default: "default_database".
+		 */
+		public Builder withBuiltInDatabaseName(String builtInDatabaseName) {
+			this.builtInDatabaseName = builtInDatabaseName;
+			return this;
+		}
+
+		/**
+		 * Returns an immutable instance of {@link EnvironmentSettings}.
+		 */
+		public EnvironmentSettings build() {
+			return new EnvironmentSettings(
+				plannerClass,
+				executorClass,
+				builtInCatalogName,
+				builtInDatabaseName,
+				isBatchMode);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
index 8e5ba8a..325732f 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableConfig.java
@@ -59,18 +59,6 @@ public class TableConfig {
 	private Integer maxGeneratedCodeLength = 64000; // just an estimate
 
 	/**
-	 * Specifies the name of the initial catalog to be created when instantiating
-	 * TableEnvironment.
-	 */
-	private String builtInCatalogName = "default_catalog";
-
-	/**
-	 * Specifies the name of the default database in the initial catalog to be created when instantiating
-	 * TableEnvironment.
-	 */
-	private String builtInDatabaseName = "default_database";
-
-	/**
 	 * Returns the timezone for date/time/timestamp conversions.
 	 */
 	public TimeZone getTimeZone() {
@@ -147,38 +135,6 @@ public class TableConfig {
 		this.maxGeneratedCodeLength = Preconditions.checkNotNull(maxGeneratedCodeLength);
 	}
 
-	/**
-	 * Gets the specified name of the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}.
-	 */
-	public String getBuiltInCatalogName() {
-		return builtInCatalogName;
-	}
-
-	/**
-	 * Specifies the name of the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
-	 */
-	public void setBuiltInCatalogName(String builtInCatalogName) {
-		this.builtInCatalogName = builtInCatalogName;
-	}
-
-	/**
-	 * Gets the specified name of the default database in the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}.
-	 */
-	public String getBuiltInDatabaseName() {
-		return builtInDatabaseName;
-	}
-
-	/**
-	 * Specifies the name of the default database in the initial catalog to be created when instantiating
-	 * a {@link TableEnvironment}. This method has no effect if called on the {@link TableEnvironment#getConfig()}.
-	 */
-	public void setBuiltInDatabaseName(String builtInDatabaseName) {
-		this.builtInDatabaseName = builtInDatabaseName;
-	}
-
 	public static TableConfig getDefault() {
 		return new TableConfig();
 	}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
index b2b50a0..85d20f6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/TableEnvironment.java
@@ -31,21 +31,23 @@ import org.apache.flink.table.sources.TableSource;
 import java.util.Optional;
 
 /**
- * The base class for batch and stream TableEnvironments.
+ * A table environment is the base class, entry point, and central context for creating Table & SQL
+ * API programs.
  *
- * <p>The TableEnvironment is a central concept of the Table API and SQL integration. It is
- * responsible for:
+ * <p>It is unified both on a language level for all JVM-based languages (i.e. there is no distinction
+ * between Scala and Java API) and for bounded and unbounded data processing.
  *
+ * <p>A table environment is responsible for:
  * <ul>
- *     <li>Registering a Table in the internal catalog</li>
- *     <li>Registering an external catalog</li>
- *     <li>Executing SQL queries</li>
- *     <li>Registering a user-defined scalar function. For the user-defined table and aggregate
- *     function, use the StreamTableEnvironment or BatchTableEnvironment</li>
+ *     <li>Connecting to external systems.</li>
+ *     <li>Registering and retrieving {@link Table}s and other meta objects from a catalog.</li>
+ *     <li>Executing SQL statements.</li>
+ *     <li>Offering further configuration options.</li>
  * </ul>
  *
- * <p>This environment is unified both on a language level (for all JVM-based languages, i.e. no distinction between
- * Scala and Java API) and for bounded and unbounded data processing.
+ * <p>Note: This environment is meant for pure table programs. If you would like to convert from or to
+ * other Flink APIs, it might be necessary to use one of the available language-specific table environments
+ * in the corresponding bridging modules.
  */
 @PublicEvolving
 public interface TableEnvironment {
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 40849aa..727727a 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -98,8 +98,8 @@ public class TableEnvironmentImpl implements TableEnvironment {
 
 		this.tableConfig = tableConfig;
 		this.tableConfig.addPlannerConfig(queryConfigProvider);
-		this.defaultCatalogName = tableConfig.getBuiltInCatalogName();
-		this.defaultDatabaseName = tableConfig.getBuiltInDatabaseName();
+		this.defaultCatalogName = catalogManager.getCurrentCatalog();
+		this.defaultDatabaseName = catalogManager.getCurrentDatabase();
 
 		this.functionCatalog = functionCatalog;
 		this.planner = planner;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
new file mode 100644
index 0000000..7de03c8
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExecutorFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.factories.ComponentFactory;
+
+import java.util.Map;
+
+/**
+ * Factory that creates {@link Executor}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
+ * called with a set of normalized properties that describe the desired configuration. Those properties
+ * may include execution configurations such as watermark interval, max parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in batch mode.
+ *
+ * <p><b>Important:</b> The implementations of this interface should also implement method
+ * <pre>{@code public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment);}
+ * </pre> This method will be used when instantiating a {@link org.apache.flink.table.api.TableEnvironment} from a
+ * bridging module which enables conversion from/to {@code DataStream} API and requires a pre configured
+ * {@code StreamTableEnvironment}.
+ */
+@Internal
+public interface ExecutorFactory extends ComponentFactory {
+
+	/**
+	 * Creates a corresponding {@link Executor}.
+	 *
+	 * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup.
+	 * @return instance of a {@link Executor}
+	 */
+	Executor create(Map<String, String> properties);
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
similarity index 55%
rename from flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
rename to flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
index fa1128e..0df52d4 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/PlannerFactory.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/PlannerFactory.java
@@ -16,54 +16,41 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.api.internal;
+package org.apache.flink.table.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactory;
 
-import java.lang.reflect.Constructor;
+import java.util.Map;
 
 /**
- * Factory to construct a {@link Planner}. It will look for the planner on the classpath.
+ * Factory that creates {@link Planner}.
+ *
+ * <p>This factory is used with Java's Service Provider Interfaces (SPI) for discovering. A factory is
+ * called with a set of normalized properties that describe the desired configuration. Those properties
+ * may include execution configurations such as watermark interval, max parallelism etc., table specific
+ * initialization configuration such as if the queries should be executed in batch mode.
  */
 @Internal
-public final class PlannerFactory {
+public interface PlannerFactory extends ComponentFactory {
 
 	/**
-	 * Looks up {@link Planner} on the class path via reflection.
+	 * Creates a corresponding {@link Planner}.
 	 *
+	 * @param properties Static properties of the {@link Planner}, the same that were used for factory lookup.
 	 * @param executor The executor required by the planner.
 	 * @param tableConfig The configuration of the planner to use.
 	 * @param functionCatalog The function catalog to look up user defined functions.
 	 * @param catalogManager The catalog manager to look up tables and views.
 	 * @return instance of a {@link Planner}
 	 */
-	public static Planner lookupPlanner(
-			Executor executor,
-			TableConfig tableConfig,
-			FunctionCatalog functionCatalog,
-			CatalogManager catalogManager) {
-		try {
-			Class<?> clazz = Class.forName("org.apache.flink.table.planner.StreamPlanner");
-			Constructor con = clazz.getConstructor(
-				Executor.class,
-				TableConfig.class,
-				FunctionCatalog.class,
-				CatalogManager.class);
-
-			return (Planner) con.newInstance(executor, tableConfig, functionCatalog, catalogManager);
-		} catch (Exception e) {
-			throw new TableException(
-				"Could not instantiate the planner. Make sure the planner module is on the classpath",
-				e);
-		}
-	}
-
-	private PlannerFactory() {
-	}
+	Planner create(
+		Map<String, String> properties,
+		Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager);
 }
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
new file mode 100644
index 0000000..45e87235
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A factory interface for components that enables further disambiguating in case
+ * there are multiple matching implementations present.
+ */
+@PublicEvolving
+public interface ComponentFactory extends TableFactory {
+	/**
+	 * Specifies a context of optional parameters that if exist should have the
+	 * given values. This enables further disambiguating if there are multiple
+	 * factories that meet the {@link #requiredContext()} and {@link #supportedProperties()}.
+	 *
+	 * <p><b>NOTE:</b> All the property keys should be included in {@link #supportedProperties()}.
+ 	 *
+	 * @return optional properties to disambiguate factories
+	 */
+	Map<String, String> optionalContext();
+
+	@Override
+	Map<String, String> requiredContext();
+
+	/**
+	 * {@inheritDoc}
+	 *
+	 * <p><b>NOTE:</b> All the property keys from {@link #optionalContext()} should also be included.
+	 */
+	@Override
+	List<String> supportedProperties();
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
new file mode 100644
index 0000000..db8e9ba
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ComponentFactoryService.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.AmbiguousTableFactoryException;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Unified class to search for a {@link ComponentFactory} of provided type and properties. It is similar to
+ * {@link TableFactoryService} but it disambiguates based on {@link ComponentFactory#optionalContext()}.
+ */
+@Internal
+public class ComponentFactoryService {
+
+	/**
+	 * Finds a table factory of the given class and property map. This method enables
+	 * disambiguating multiple matching {@link ComponentFactory}s based on additional
+	 * optional context provided via {@link ComponentFactory#optionalContext()}.
+	 *
+	 * @param factoryClass desired factory class
+	 * @param propertyMap properties that describe the factory configuration
+	 * @param <T> factory class type
+	 * @return the matching factory
+	 */
+	public static <T extends ComponentFactory> T find(Class<T> factoryClass, Map<String, String> propertyMap) {
+		List<T> all = TableFactoryService.findAll(factoryClass, propertyMap);
+
+		List<T> filtered = all.stream().filter(factory -> {
+			Map<String, String> optionalContext = factory.optionalContext();
+			return optionalContext.entrySet().stream().allMatch(entry -> {
+					String property = propertyMap.get(entry.getKey());
+					if (property != null) {
+						return property.equals(entry.getValue());
+					} else {
+						return true;
+					}
+				}
+			);
+		}).collect(Collectors.toList());
+
+		if (filtered.size() > 1) {
+			throw new AmbiguousTableFactoryException(
+				filtered,
+				factoryClass,
+				new ArrayList<>(all),
+				propertyMap
+			);
+		} else if (filtered.isEmpty()) {
+			throw new NoMatchingTableFactoryException(
+				"No factory supports the additional filters.",
+				factoryClass,
+				new ArrayList<>(all),
+				propertyMap);
+		} else {
+			return filtered.get(0);
+		}
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
new file mode 100644
index 0000000..13e821d
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ComponentFactoryServiceTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.NoMatchingTableFactoryException;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.utils.TestPlannerFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ComponentFactoryService}.
+ */
+public class ComponentFactoryServiceTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testLookingUpAmbiguousPlanners() {
+		Map<String, String> properties = new HashMap<>();
+		properties.put(EnvironmentSettings.CLASS_NAME, TestPlannerFactory.class.getCanonicalName());
+		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+		PlannerFactory plannerFactory = ComponentFactoryService.find(PlannerFactory.class, properties);
+
+		assertThat(plannerFactory, instanceOf(TestPlannerFactory.class));
+	}
+
+	@Test
+	public void testLookingUpNonExistentClass() {
+		thrown.expect(NoMatchingTableFactoryException.class);
+		thrown.expectMessage("Reason: No factory supports the additional filters");
+
+		Map<String, String> properties = new HashMap<>();
+		properties.put(EnvironmentSettings.CLASS_NAME, "NoSuchClass");
+		properties.put(EnvironmentSettings.BATCH_MODE, Boolean.toString(true));
+		properties.put(TestPlannerFactory.PLANNER_TYPE_KEY, TestPlannerFactory.PLANNER_TYPE_VALUE);
+
+		ComponentFactoryService.find(PlannerFactory.class, properties);
+	}
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
new file mode 100644
index 0000000..c4c3804
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/OtherTestPlannerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.factories.ComponentFactoryServiceTest;
+
+/**
+ * Test {@link Planner} factory used in {@link ComponentFactoryServiceTest}.
+ */
+public class OtherTestPlannerFactory extends TestPlannerFactory {
+}
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
new file mode 100644
index 0000000..1a0f05f
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/utils/TestPlannerFactory.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.factories.utils;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test {@link Planner} factory used in {@link org.apache.flink.table.factories.ComponentFactoryServiceTest}.
+ */
+public class TestPlannerFactory implements PlannerFactory {
+
+	public static final String PLANNER_TYPE_KEY = "planner-type";
+	public static final String PLANNER_TYPE_VALUE = "test-planner";
+
+	@Override
+	public Planner create(
+		Map<String, String> properties, Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager) {
+		return null;
+	}
+
+	@Override
+	public Map<String, String> optionalContext() {
+		HashMap<String, String> map = new HashMap<>();
+		map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return map;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		Map<String, String> map = new HashMap<>();
+		map.put(PLANNER_TYPE_KEY, PLANNER_TYPE_VALUE);
+		return map;
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Arrays.asList(EnvironmentSettings.CLASS_NAME, EnvironmentSettings.BATCH_MODE);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
similarity index 77%
copy from flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
copy to flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..e2a6a48 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-api-java/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -13,7 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.table.sources.CsvBatchTableSourceFactory
-org.apache.flink.table.sources.CsvAppendTableSourceFactory
-org.apache.flink.table.sinks.CsvBatchTableSinkFactory
-org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.factories.utils.TestPlannerFactory
+org.apache.flink.table.factories.utils.OtherTestPlannerFactory
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
index 7018af2..fa6b178 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/BatchTableEnvironment.scala
@@ -233,11 +233,12 @@ object BatchTableEnvironment {
           classOf[ExecutionEnvironment],
           classOf[TableConfig],
           classOf[CatalogManager])
+      val defaultCatalog = "default_catalog"
       val catalogManager = new CatalogManager(
-        tableConfig.getBuiltInCatalogName,
+        "default_catalog",
         new GenericInMemoryCatalog(
-          tableConfig.getBuiltInCatalogName,
-          tableConfig.getBuiltInDatabaseName)
+          defaultCatalog,
+          "default_database")
       )
       const.newInstance(executionEnvironment, tableConfig, catalogManager)
         .asInstanceOf[BatchTableEnvironment]
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
index 473522e..3baa5fa 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/StreamTableEnvironment.scala
@@ -17,28 +17,33 @@
  */
 package org.apache.flink.table.api.scala
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl
 import org.apache.flink.table.api.{TableEnvironment, _}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction}
 
 /**
-  * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-  * [[DataStream]]s.
+  * This table environment is the entry point and central context for creating Table & SQL
+  * API programs that integrate with the Scala-specific [[DataStream]] API.
   *
-  * A TableEnvironment can be used to:
-  * - convert a [[DataStream]] to a [[Table]]
-  * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-  * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-  * - scan a registered table to obtain a [[Table]]
-  * - specify a SQL query on registered tables to obtain a [[Table]]
-  * - convert a [[Table]] into a [[DataStream]]
-  * - explain the AST and execution plan of a [[Table]]
+  * It is unified for bounded and unbounded data processing.
+  *
+  * A stream table environment is responsible for:
+  *
+  * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+  * - Connecting to external systems.
+  * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+  * - Executing SQL statements.
+  * - Offering further configuration options.
+  *
+  * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for pure
+  * table programs.
   */
+@PublicEvolving
 trait StreamTableEnvironment extends TableEnvironment {
 
   /**
@@ -245,43 +250,90 @@ trait StreamTableEnvironment extends TableEnvironment {
 object StreamTableEnvironment {
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
     */
   def create(executionEnvironment: StreamExecutionEnvironment): StreamTableEnvironment = {
-    create(executionEnvironment, new TableConfig)
+    create(
+      executionEnvironment,
+      EnvironmentSettings.newInstance().build())
   }
 
   /**
-    * The [[TableEnvironment]] for a Scala [[StreamExecutionEnvironment]] that works with
-    * [[DataStream]]s.
-    *
-    * A TableEnvironment can be used to:
-    * - convert a [[DataStream]] to a [[Table]]
-    * - register a [[DataStream]] in the [[TableEnvironment]]'s catalog
-    * - register a [[Table]] in the [[TableEnvironment]]'s catalog
-    * - scan a registered table to obtain a [[Table]]
-    * - specify a SQL query on registered tables to obtain a [[Table]]
-    * - convert a [[Table]] into a [[DataStream]]
-    * - explain the AST and execution plan of a [[Table]]
-    *
-    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the TableEnvironment.
-    * @param tableConfig The configuration of the TableEnvironment.
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
+    * @param settings The environment settings used to instantiate the [[TableEnvironment]].
+    */
+  def create(
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings)
+    : StreamTableEnvironment = {
+    StreamTableEnvironmentImpl.create(executionEnvironment, settings, new TableConfig)
+  }
+
+  /**
+    * Creates a table environment that is the entry point and central context for creating Table &
+    * SQL API programs that integrate with the Scala-specific [[DataStream]] API.
+    *
+    * It is unified for bounded and unbounded data processing.
+    *
+    * A stream table environment is responsible for:
+    *
+    * - Convert a [[DataStream]] into [[Table]] and vice-versa.
+    * - Connecting to external systems.
+    * - Registering and retrieving [[Table]]s and other meta objects from a catalog.
+    * - Executing SQL statements.
+    * - Offering further configuration options.
+    *
+    * Note: If you don't intend to use the [[DataStream]] API, [[TableEnvironment]] is meant for
+    * pure table programs.
+    *
+    * @param executionEnvironment The Scala [[StreamExecutionEnvironment]] of the
+    *                             [[TableEnvironment]].
+    * @param tableConfig The configuration of the [[TableEnvironment]].
+    * @deprecated Use [[create(StreamExecutionEnvironment)]] and
+    *             [[StreamTableEnvironment#getConfig()]] for manipulating the [[TableConfig]].
     */
+  @deprecated
   def create(executionEnvironment: StreamExecutionEnvironment, tableConfig: TableConfig)
     : StreamTableEnvironment = {
 
-    StreamTableEnvironmentImpl.create(tableConfig, executionEnvironment)
+    StreamTableEnvironmentImpl
+      .create(
+        executionEnvironment,
+        EnvironmentSettings.newInstance().build(),
+        tableConfig)
   }
 }
diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
index 33b304d..96b7403 100644
--- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
+++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/scala/internal/StreamTableEnvironmentImpl.scala
@@ -17,9 +17,6 @@
  */
 package org.apache.flink.table.api.scala.internal
 
-import java.util
-import java.util.{Collections, List => JList}
-
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.dag.Transformation
@@ -29,18 +26,22 @@ import org.apache.flink.streaming.api.datastream.{DataStream => JDataStream}
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JStreamExecutionEnvironment}
 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 import org.apache.flink.table.api._
-import org.apache.flink.table.api.internal.{PlannerFactory, TableEnvironmentImpl}
+import org.apache.flink.table.api.internal.TableEnvironmentImpl
 import org.apache.flink.table.api.scala.StreamTableEnvironment
 import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
-import org.apache.flink.table.delegation.{Executor, Planner}
+import org.apache.flink.table.delegation.{Executor, ExecutorFactory, Planner, PlannerFactory}
 import org.apache.flink.table.descriptors.{ConnectorDescriptor, StreamTableDescriptor}
 import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.factories.ComponentFactoryService
 import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserFunctionsTypeHelper}
 import org.apache.flink.table.operations.{OutputConversionModifyOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.sources.{TableSource, TableSourceValidation}
 import org.apache.flink.table.types.utils.TypeConversions
 import org.apache.flink.table.typeutils.FieldInfoUtils
 
+import java.util
+import java.util.{Collections, List => JList, Map => JMap}
+
 import _root_.scala.collection.JavaConverters._
 
 /**
@@ -234,61 +235,54 @@ class StreamTableEnvironmentImpl (
 
 object StreamTableEnvironmentImpl {
 
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment.
-    */
   def create(
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
+      executionEnvironment: StreamExecutionEnvironment,
+      settings: EnvironmentSettings,
+      tableConfig: TableConfig)
     : StreamTableEnvironmentImpl = {
-    val catalogManager = new CatalogManager(
-      tableConfig.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(
-        tableConfig.getBuiltInCatalogName,
-        tableConfig.getBuiltInDatabaseName)
-    )
-    create(catalogManager, tableConfig, executionEnvironment)
-  }
-
-  /**
-    * Creates an instance of a [[StreamTableEnvironment]]. It uses the
-    * [[StreamExecutionEnvironment]] for executing queries. This is also the
-    * [[StreamExecutionEnvironment]] that will be used when converting
-    * from/to [[DataStream]].
-    *
-    * @param catalogManager The [[CatalogManager]] to use for storing and looking up [[Table]]s.
-    * @param tableConfig The configuration of the TableEnvironment.
-    * @param executionEnvironment The [[StreamExecutionEnvironment]] of the TableEnvironment.
-    */
-  def create(
-      catalogManager: CatalogManager,
-      tableConfig: TableConfig,
-      executionEnvironment: StreamExecutionEnvironment)
-    : StreamTableEnvironmentImpl = {
-    val executor = lookupExecutor(executionEnvironment)
+    val executorProperties = settings.toExecutorProperties
+    val plannerProperties = settings.toPlannerProperties
+    val executor = lookupExecutor(executorProperties, executionEnvironment)
     val functionCatalog = new FunctionCatalog(
-      catalogManager.getCurrentCatalog,
-      catalogManager.getCurrentDatabase)
+      settings.getBuiltInCatalogName,
+      settings.getBuiltInDatabaseName)
+    val catalogManager = new CatalogManager(
+      settings.getBuiltInCatalogName,
+      new GenericInMemoryCatalog(settings.getBuiltInCatalogName, settings.getBuiltInDatabaseName))
+    val planner = ComponentFactoryService.find(classOf[PlannerFactory], plannerProperties)
+      .create(
+        plannerProperties,
+        executor,
+        tableConfig,
+        functionCatalog,
+        catalogManager)
     new StreamTableEnvironmentImpl(
       catalogManager,
       functionCatalog,
       tableConfig,
       executionEnvironment,
-      PlannerFactory.lookupPlanner(executor, tableConfig, functionCatalog, catalogManager),
+      planner,
       executor)
   }
 
-  private def lookupExecutor(executionEnvironment: StreamExecutionEnvironment) =
+  private def lookupExecutor(
+      executorProperties: JMap[String, String],
+      executionEnvironment: StreamExecutionEnvironment)
+    :Executor =
     try {
-      val clazz = Class.forName("org.apache.flink.table.executor.ExecutorFactory")
-      val createMethod = clazz.getMethod("create", classOf[JStreamExecutionEnvironment])
-      createMethod.invoke(null, executionEnvironment.getWrappedStreamExecutionEnvironment)
+      val executorFactory = ComponentFactoryService
+        .find(classOf[ExecutorFactory], executorProperties)
+      val createMethod = executorFactory.getClass
+        .getMethod(
+          "create",
+          classOf[util.Map[String, String]],
+          classOf[JStreamExecutionEnvironment])
+
+      createMethod
+        .invoke(
+          executorFactory,
+          executorProperties,
+          executionEnvironment.getWrappedStreamExecutionEnvironment)
         .asInstanceOf[Executor]
     } catch {
       case e: Exception =>
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
index 5148839..426e09e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutor.java
@@ -19,6 +19,7 @@
 package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,7 +35,8 @@ import java.util.List;
 public class StreamExecutor implements Executor {
 	private final StreamExecutionEnvironment executionEnvironment;
 
-	StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
+	@VisibleForTesting
+	public StreamExecutor(StreamExecutionEnvironment executionEnvironment) {
 		this.executionEnvironment = executionEnvironment;
 	}
 
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
similarity index 51%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
index 5534217..dff5590 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/ExecutorFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/executor/StreamExecutorFactory.java
@@ -20,36 +20,59 @@ package org.apache.flink.table.executor;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Factory to create an implementation of {@link Executor} to use in a
  * {@link org.apache.flink.table.api.TableEnvironment}. The {@link org.apache.flink.table.api.TableEnvironment}
- * should use {@link #create()} method that does not bind to any particular environment,
+ * should use {@link #create(Map)} method that does not bind to any particular environment,
  * whereas {@link org.apache.flink.table.api.scala.StreamTableEnvironment} should use
- * {@link #create(StreamExecutionEnvironment)} as it is always backed by some {@link StreamExecutionEnvironment}
+ * {@link #create(Map, StreamExecutionEnvironment)} as it is always backed by
+ * some {@link StreamExecutionEnvironment}
  */
 @Internal
-public class ExecutorFactory {
+public class StreamExecutorFactory implements ExecutorFactory {
+
 	/**
-	 * Creates a {@link StreamExecutor} that is backed by given {@link StreamExecutionEnvironment}.
+	 * Creates a corresponding {@link StreamExecutor}.
 	 *
+	 * @param properties Static properties of the {@link Executor}, the same that were used for factory lookup.
 	 * @param executionEnvironment a {@link StreamExecutionEnvironment} to use while executing Table programs.
-	 * @return {@link StreamExecutor}
+	 * @return instance of a {@link Executor}
 	 */
-	public static Executor create(StreamExecutionEnvironment executionEnvironment) {
+	public Executor create(Map<String, String> properties, StreamExecutionEnvironment executionEnvironment) {
 		return new StreamExecutor(executionEnvironment);
 	}
 
-	/**
-	 * Creates a {@link StreamExecutor} that is backed by a default {@link StreamExecutionEnvironment}.
-	 *
-	 * @return {@link StreamExecutor}
-	 */
-	public static Executor create() {
+	@Override
+	public Executor create(Map<String, String> properties) {
 		return new StreamExecutor(StreamExecutionEnvironment.getExecutionEnvironment());
 	}
 
-	private ExecutorFactory() {
+	@Override
+	public Map<String, String> requiredContext() {
+		DescriptorProperties properties = new DescriptorProperties();
+		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		return properties.asMap();
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+	}
+
+	@Override
+	public Map<String, String> optionalContext() {
+		Map<String, String> context = new HashMap<>();
+		context.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return context;
 	}
 }
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
new file mode 100644
index 0000000..4efb850
--- /dev/null
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/StreamPlannerFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Factory to construct a {@link StreamPlanner}.
+ */
+@Internal
+public final class StreamPlannerFactory implements PlannerFactory {
+
+	@Override
+	public Planner create(
+		Map<String, String> properties,
+		Executor executor,
+		TableConfig tableConfig,
+		FunctionCatalog functionCatalog,
+		CatalogManager catalogManager) {
+		return new StreamPlanner(executor, tableConfig, functionCatalog, catalogManager);
+	}
+
+	public Map<String, String> optionalContext() {
+		Map<String, String> map = new HashMap<>();
+		map.put(EnvironmentSettings.CLASS_NAME, this.getClass().getCanonicalName());
+		return map;
+	}
+
+	@Override
+	public Map<String, String> requiredContext() {
+		DescriptorProperties properties = new DescriptorProperties();
+
+		properties.putBoolean(EnvironmentSettings.BATCH_MODE, false);
+		return properties.asMap();
+	}
+
+	@Override
+	public List<String> supportedProperties() {
+		return Collections.singletonList(EnvironmentSettings.CLASS_NAME);
+	}
+}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
index ece39eb..6bd2ea3 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.TableFactory
@@ -17,3 +17,5 @@ org.apache.flink.table.sources.CsvBatchTableSourceFactory
 org.apache.flink.table.sources.CsvAppendTableSourceFactory
 org.apache.flink.table.sinks.CsvBatchTableSinkFactory
 org.apache.flink.table.sinks.CsvAppendTableSinkFactory
+org.apache.flink.table.planner.StreamPlannerFactory
+org.apache.flink.table.executor.StreamExecutorFactory
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
index 2b51869..efa29ed 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/api/internal/TableEnvImpl.scala
@@ -52,8 +52,8 @@ abstract class TableEnvImpl(
     private val catalogManager: CatalogManager)
   extends TableEnvironment {
 
-  protected val defaultCatalogName: String = config.getBuiltInCatalogName
-  protected val defaultDatabaseName: String = config.getBuiltInDatabaseName
+  protected val defaultCatalogName: String = catalogManager.getCurrentCatalog
+  protected val defaultDatabaseName: String = catalogManager.getCurrentDatabase
 
   // Table API/SQL function catalog
   private[flink] val functionCatalog: FunctionCatalog = new FunctionCatalog(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
index bdb2913..44dc4ef 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/stream/StreamTableEnvironmentTest.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.api.stream
 
-import java.lang.{Integer => JInt, Long => JLong}
-
 import org.apache.flink.api.java.tuple.{Tuple5 => JTuple5}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.api.scala._
@@ -31,14 +29,19 @@ import org.apache.flink.table.api.java.internal.{StreamTableEnvironmentImpl => J
 import org.apache.flink.table.api.java.{StreamTableEnvironment => JStreamTableEnv}
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.{TableConfig, Types, ValidationException}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
+import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.runtime.utils.StreamTestData
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.table.utils.TableTestUtil.{binaryNode, streamTableNode, term, unaryNode}
 import org.apache.flink.types.Row
+
 import org.junit.Test
 import org.mockito.Mockito.{mock, when}
 
+import java.lang.{Integer => JInt, Long => JLong}
+
 class StreamTableEnvironmentTest extends TableTestBase {
 
   @Test
@@ -200,12 +203,19 @@ class StreamTableEnvironmentTest extends TableTestBase {
     val jStreamExecEnv = mock(classOf[JStreamExecEnv])
     when(jStreamExecEnv.getStreamTimeCharacteristic).thenReturn(TimeCharacteristic.EventTime)
     val config = new TableConfig
-    val jTEnv = JStreamTableEnvironmentImpl.create(
-      new CatalogManager(
-        config.getBuiltInCatalogName,
-        new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName)),
+    val manager: CatalogManager = new CatalogManager(
+      "default_catalog",
+      new GenericInMemoryCatalog("default_catalog", "default_database"))
+    val executor: StreamExecutor = new StreamExecutor(jStreamExecEnv)
+    val functionCatalog = new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+    val streamPlanner = new StreamPlanner(executor, config, functionCatalog, manager)
+    val jTEnv = new JStreamTableEnvironmentImpl(
+      manager,
+      functionCatalog,
       config,
-      jStreamExecEnv)
+      jStreamExecEnv,
+      streamPlanner,
+      executor)
 
     val sType = new TupleTypeInfo(Types.LONG, Types.INT, Types.STRING, Types.INT, Types.LONG)
       .asInstanceOf[TupleTypeInfo[JTuple5[JLong, JInt, String, JInt, JLong]]]
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
index 2ca6ee3..e72225d 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/utils/TableTestBase.scala
@@ -18,8 +18,6 @@
 
 package org.apache.flink.table.utils
 
-import org.apache.calcite.plan.RelOptUtil
-import org.apache.calcite.rel.RelNode
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{LocalEnvironment, DataSet => JDataSet}
 import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
@@ -32,12 +30,16 @@ import org.apache.flink.table.api.java.internal.{BatchTableEnvironmentImpl => Ja
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.internal.{BatchTableEnvironmentImpl => ScalaBatchTableEnvironmentImpl, StreamTableEnvironmentImpl => ScalaStreamTableEnvironmentImpl}
 import org.apache.flink.table.api.{Table, TableConfig, TableSchema}
-import org.apache.flink.table.catalog.{CatalogManager, GenericInMemoryCatalog}
+import org.apache.flink.table.catalog.{CatalogManager, FunctionCatalog, GenericInMemoryCatalog}
+import org.apache.flink.table.executor.StreamExecutor
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.operations.{DataSetQueryOperation, JavaDataStreamQueryOperation, ScalaDataStreamQueryOperation}
 import org.apache.flink.table.planner.StreamPlanner
 import org.apache.flink.table.utils.TableTestUtil.createCatalogManager
+
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rel.RelNode
 import org.junit.Assert.assertEquals
 import org.junit.rules.ExpectedException
 import org.junit.{ComparisonFailure, Rule}
@@ -137,25 +139,11 @@ object TableTestUtil {
 
   val ANY_SUBTREE = "%ANY_SUBTREE%"
 
-  /**
-    * Creates a [[CatalogManager]] with a builtin default catalog & database set to values
-    * specified in the [[TableConfig]].
-    */
-  def createCatalogManager(config: TableConfig): CatalogManager = {
+  def createCatalogManager(): CatalogManager = {
+    val defaultCatalog = "default_catalog"
     new CatalogManager(
-      config.getBuiltInCatalogName,
-      new GenericInMemoryCatalog(config.getBuiltInCatalogName, config.getBuiltInDatabaseName))
-  }
-
-  /**
-    * Sets the configuration of the builtin catalog & databases in [[TableConfig]]
-    * to the current catalog & database of the given [[CatalogManager]]. This should be used
-    * to ensure sanity of a [[org.apache.flink.table.api.TableEnvironment]].
-    */
-  def extractBuiltinPath(config: TableConfig, catalogManager: CatalogManager): TableConfig = {
-    config.setBuiltInCatalogName(catalogManager.getCurrentCatalog)
-    config.setBuiltInDatabaseName(catalogManager.getCurrentDatabase)
-    config
+      defaultCatalog,
+      new GenericInMemoryCatalog(defaultCatalog, "default_database"))
   }
 
   private[utils] def toRelNode(expected: Table) = {
@@ -239,22 +227,15 @@ case class BatchTableTestUtil(
   extends TableTestUtil {
   val javaEnv = new LocalEnvironment()
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
-
   val javaTableEnv = new JavaBatchTableEnvironmentImpl(
     javaEnv,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
   val env = new ExecutionEnvironment(javaEnv)
   val tableEnv = new ScalaBatchTableEnvironmentImpl(
     env,
-    tableConfig,
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)))
+    new TableConfig,
+    catalogManager.getOrElse(createCatalogManager()))
 
   def addTable[T: TypeInformation](
       name: String,
@@ -344,22 +325,29 @@ case class StreamTableTestUtil(
   val javaEnv = new LocalStreamEnvironment()
   javaEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
-  private def tableConfig = catalogManager match {
-    case Some(c) =>
-      TableTestUtil.extractBuiltinPath(new TableConfig, c)
-    case None =>
-      new TableConfig
-  }
+  private val tableConfig = new TableConfig
+  private val manager: CatalogManager = catalogManager.getOrElse(createCatalogManager())
+  private val executor: StreamExecutor = new StreamExecutor(javaEnv)
+  private val functionCatalog =
+    new FunctionCatalog(manager.getCurrentCatalog, manager.getCurrentDatabase)
+  private val streamPlanner = new StreamPlanner(executor, tableConfig, functionCatalog, manager)
 
-  val javaTableEnv = JavaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val javaTableEnv = new JavaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    javaEnv)
+    javaEnv,
+    streamPlanner,
+    executor)
+
   val env = new StreamExecutionEnvironment(javaEnv)
-  val tableEnv = ScalaStreamTableEnvironmentImpl.create(
-    catalogManager.getOrElse(createCatalogManager(new TableConfig)),
+  val tableEnv = new ScalaStreamTableEnvironmentImpl(
+    manager,
+    functionCatalog,
     tableConfig,
-    env)
+    env,
+    streamPlanner,
+    executor)
 
   def addTable[T: TypeInformation](
       name: String,