You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/08/07 16:33:43 UTC

[flink] branch release-1.9 updated: [FLINK-13521][sql-client] Allow setting configurations in SQL CLI

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 41b8982  [FLINK-13521][sql-client] Allow setting configurations in SQL CLI
41b8982 is described below

commit 41b898295e66beb96082624ee2801748e6568793
Author: TsReaper <ts...@gmail.com>
AuthorDate: Fri Aug 2 11:15:23 2019 +0800

    [FLINK-13521][sql-client] Allow setting configurations in SQL CLI
    
    This closes #9328.
---
 docs/dev/table/sqlClient.md                        | 19 +++--
 .../flink-sql-client/conf/sql-client-defaults.yaml | 19 ++++-
 .../flink/table/client/config/Environment.java     | 22 ++++++
 .../client/config/entries/ConfigurationEntry.java  | 81 ++++++++++++++++++++++
 .../client/gateway/local/ExecutionContext.java     |  4 ++
 .../table/client/gateway/local/LocalExecutor.java  |  1 +
 .../client/gateway/local/EnvironmentTest.java      |  5 ++
 .../client/gateway/local/ExecutionContextTest.java | 43 ++++++++++++
 .../client/gateway/local/LocalExecutorITCase.java  |  1 +
 ...ory.yaml => test-sql-client-configuration.yaml} | 41 +++--------
 .../test/resources/test-sql-client-defaults.yaml   |  3 +
 .../test/resources/test-sql-client-factory.yaml    |  3 +
 .../table/api/config/ExecutionConfigOptions.java   |  2 +
 .../table/api/config/OptimizerConfigOptions.java   |  2 +
 14 files changed, 206 insertions(+), 40 deletions(-)

diff --git a/docs/dev/table/sqlClient.md b/docs/dev/table/sqlClient.md
index 8ec569d..059cd94 100644
--- a/docs/dev/table/sqlClient.md
+++ b/docs/dev/table/sqlClient.md
@@ -213,7 +213,7 @@ catalogs:
      hive-conf-dir: ...
      hive-version: 1.2.1
 
-# Execution properties allow for changing the behavior of a table program.
+# Properties that change the fundamental execution behavior of a table program.
 
 execution:
   planner: old                      # optional: either 'old' (default) or 'blink'
@@ -233,7 +233,16 @@ execution:
   restart-strategy:                 # optional: restart strategy
     type: fallback                  #   "fallback" to global restart strategy by default
 
-# Deployment properties allow for describing the cluster to which table programs are submitted to.
+# Configuration options for adjusting and tuning table programs.
+
+# A full list of options and their default values can be found
+# on the dedicated "Configuration" page.
+configuration:
+  table.optimizer.join-reorder-enabled: true
+  table.exec.spill-compression.enabled: true
+  table.exec.spill-compression.block-size: 128kb
+
+# Properties that describe the cluster to which table programs are submitted to.
 
 deployment:
   response-timeout: 5000
@@ -245,9 +254,9 @@ This configuration:
 - defines a view `MyCustomView` that declares a virtual table using a SQL query,
 - defines a user-defined function `myUDF` that can be instantiated using the class name and two constructor parameters,
 - connects to two Hive catalogs and uses `catalog_1` as the current catalog with `mydb1` as the current database of the catalog,
-- specifies a parallelism of 1 for queries executed in this streaming environment,
-- specifies an event-time characteristic, and
-- runs queries in the `table` result mode.
+- uses the old planner in streaming mode for running statements with event-time characteristic and a parallelism of 1,
+- runs exploratory queries in the `table` result mode,
+- and makes some planner adjustments around join reordering and spilling via configuration options.
 
 Depending on the use case, a configuration can be split into multiple files. Therefore, environment files can be created for general purposes (*defaults environment file* using `--defaults`) as well as on a per-session basis (*session environment file* using `--environment`). Every CLI session is initialized with the default properties followed by the session properties. For example, the defaults environment file could specify all table sources that should be available for querying in ev [...]
 
diff --git a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
index db6a0ea..f6e8340 100644
--- a/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/conf/sql-client-defaults.yaml
@@ -83,7 +83,7 @@ catalogs: [] # empty list
 # Execution properties
 #==============================================================================
 
-# Execution properties allow for changing the behavior of a table program.
+# Properties that change the fundamental execution behavior of a table program.
 
 execution:
   # select the implementation responsible for planning table programs
@@ -117,13 +117,26 @@ execution:
     # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
     type: fallback
 
+#==============================================================================
+# Configuration options
+#==============================================================================
+
+# Configuration options for adjusting and tuning table programs.
+
+# A full list of options and their default values can be found
+# on the dedicated "Configuration" web page.
+
+# A configuration can look like:
+# configuration:
+#   table.exec.spill-compression.enabled: true
+#   table.exec.spill-compression.block-size: 128kb
+#   table.optimizer.join-reorder-enabled: true
 
 #==============================================================================
 # Deployment properties
 #==============================================================================
 
-# Deployment properties allow for describing the cluster to which table
-# programs are submitted to.
+# Properties that describe the cluster to which table programs are submitted to.
 
 deployment:
   # general cluster communication timeout in ms
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
index 64c9453..39bced9 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/Environment.java
@@ -20,6 +20,7 @@ package org.apache.flink.table.client.config;
 
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.entries.CatalogEntry;
+import org.apache.flink.table.client.config.entries.ConfigurationEntry;
 import org.apache.flink.table.client.config.entries.DeploymentEntry;
 import org.apache.flink.table.client.config.entries.ExecutionEntry;
 import org.apache.flink.table.client.config.entries.FunctionEntry;
@@ -48,6 +49,8 @@ public class Environment {
 
 	public static final String EXECUTION_ENTRY = "execution";
 
+	public static final String CONFIGURATION_ENTRY = "table";
+
 	public static final String DEPLOYMENT_ENTRY = "deployment";
 
 	private Map<String, CatalogEntry> catalogs;
@@ -58,6 +61,8 @@ public class Environment {
 
 	private ExecutionEntry execution;
 
+	private ConfigurationEntry configuration;
+
 	private DeploymentEntry deployment;
 
 	public Environment() {
@@ -65,6 +70,7 @@ public class Environment {
 		this.tables = Collections.emptyMap();
 		this.functions = Collections.emptyMap();
 		this.execution = ExecutionEntry.DEFAULT_INSTANCE;
+		this.configuration = ConfigurationEntry.DEFAULT_INSTANCE;
 		this.deployment = DeploymentEntry.DEFAULT_INSTANCE;
 	}
 
@@ -128,6 +134,14 @@ public class Environment {
 		return execution;
 	}
 
+	public void setConfiguration(Map<String, Object> config) {
+		this.configuration = ConfigurationEntry.create(config);
+	}
+
+	public ConfigurationEntry getConfiguration() {
+		return configuration;
+	}
+
 	public void setDeployment(Map<String, Object> config) {
 		this.deployment = DeploymentEntry.create(config);
 	}
@@ -156,6 +170,8 @@ public class Environment {
 		});
 		sb.append("=================== Execution ====================\n");
 		execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+		sb.append("================== Configuration =================\n");
+		configuration.asMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
 		sb.append("=================== Deployment ===================\n");
 		deployment.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
 		return sb.toString();
@@ -209,6 +225,9 @@ public class Environment {
 		// merge execution properties
 		mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution());
 
+		// merge configuration properties
+		mergedEnv.configuration = ConfigurationEntry.merge(env1.getConfiguration(), env2.getConfiguration());
+
 		// merge deployment properties
 		mergedEnv.deployment = DeploymentEntry.merge(env1.getDeployment(), env2.getDeployment());
 
@@ -237,6 +256,9 @@ public class Environment {
 		// enrich execution properties
 		enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties);
 
+		// enrich configuration properties
+		enrichedEnv.configuration = ConfigurationEntry.enrich(env.configuration, properties);
+
 		// enrich deployment properties
 		enrichedEnv.deployment = DeploymentEntry.enrich(env.deployment, properties);
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java
new file mode 100644
index 0000000..08c0c67
--- /dev/null
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/config/entries/ConfigurationEntry.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.client.config.entries;
+
+import org.apache.flink.table.client.config.ConfigUtil;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.table.client.config.Environment.CONFIGURATION_ENTRY;
+
+/**
+ * Configuration for configuring {@link org.apache.flink.table.api.TableConfig}.
+ */
+public class ConfigurationEntry extends ConfigEntry {
+
+	public static final ConfigurationEntry DEFAULT_INSTANCE =
+		new ConfigurationEntry(new DescriptorProperties(true));
+
+	private ConfigurationEntry(DescriptorProperties properties) {
+		super(properties);
+	}
+
+	@Override
+	protected void validate(DescriptorProperties properties) {
+		// Nothing to validate as the planner will check the options
+	}
+
+	// --------------------------------------------------------------------------------------------
+
+	public static ConfigurationEntry create(Map<String, Object> config) {
+		return new ConfigurationEntry(ConfigUtil.normalizeYaml(config));
+	}
+
+	/**
+	 * Merges two configuration entries. The properties of the first configuration entry might be
+	 * overwritten by the second one.
+	 */
+	public static ConfigurationEntry merge(ConfigurationEntry configuration1, ConfigurationEntry configuration2) {
+		final Map<String, String> mergedProperties = new HashMap<>(configuration1.asMap());
+		mergedProperties.putAll(configuration2.asMap());
+
+		final DescriptorProperties properties = new DescriptorProperties(true);
+		properties.putProperties(mergedProperties);
+
+		return new ConfigurationEntry(properties);
+	}
+
+	public static ConfigurationEntry enrich(ConfigurationEntry configuration, Map<String, String> prefixedProperties) {
+		final Map<String, String> enrichedProperties = new HashMap<>(configuration.asMap());
+
+		prefixedProperties.forEach((k, v) -> {
+			final String normalizedKey = k.toLowerCase();
+			if (k.startsWith(CONFIGURATION_ENTRY + ".")) {
+				enrichedProperties.put(normalizedKey, v);
+			}
+		});
+
+		final DescriptorProperties properties = new DescriptorProperties(true);
+		properties.putProperties(enrichedProperties);
+
+		return new ConfigurationEntry(properties);
+	}
+}
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
index f212e98..27c3ee2 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java
@@ -368,6 +368,10 @@ public class ExecutionContext<T> {
 				throw new SqlExecutionException("Unsupported execution type specified.");
 			}
 
+			// set table configuration
+			mergedEnv.getConfiguration().asMap().forEach((k, v) ->
+				tableEnv.getConfig().getConfiguration().setString(k, v));
+
 			// register catalogs
 			catalogs.forEach(tableEnv::registerCatalog);
 
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index a7fd6b1..3394df7 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -187,6 +187,7 @@ public class LocalExecutor implements Executor {
 		final Map<String, String> properties = new HashMap<>();
 		properties.putAll(env.getExecution().asTopLevelMap());
 		properties.putAll(env.getDeployment().asTopLevelMap());
+		properties.putAll(env.getConfiguration().asMap());
 		return properties;
 	}
 
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
index 2ad69e4..94d6fa2 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/EnvironmentTest.java
@@ -79,6 +79,11 @@ public class EnvironmentTest {
 		assertEquals(tables, merged.getTables().keySet());
 		assertTrue(merged.getExecution().inStreamingMode());
 		assertEquals(16, merged.getExecution().getMaxParallelism());
+
+		final Map<String, String> configuration = new HashMap<>();
+		configuration.put("table.optimizer.join-reorder-enabled", "true");
+
+		assertEquals(configuration, merged.getConfiguration().asMap());
 	}
 
 	@Test
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
index aaa8321..a3a9ce9 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/ExecutionContextTest.java
@@ -25,6 +25,8 @@ import org.apache.flink.client.cli.DefaultCLI;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.Types;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
@@ -60,6 +62,7 @@ public class ExecutionContextTest {
 	private static final String DEFAULTS_ENVIRONMENT_FILE = "test-sql-client-defaults.yaml";
 	private static final String CATALOGS_ENVIRONMENT_FILE = "test-sql-client-catalogs.yaml";
 	private static final String STREAMING_ENVIRONMENT_FILE = "test-sql-client-streaming.yaml";
+	private static final String CONFIGURATION_ENVIRONMENT_FILE = "test-sql-client-configuration.yaml";
 
 	@Test
 	public void testExecutionConfig() throws Exception {
@@ -230,6 +233,42 @@ public class ExecutionContextTest {
 			tableEnv.scan("TemporalTableUsage").getSchema().getFieldNames());
 	}
 
+	@Test
+	public void testConfiguration() throws Exception {
+		final ExecutionContext<?> context = createConfigurationExecutionContext();
+		final TableEnvironment tableEnv = context.createEnvironmentInstance().getTableEnvironment();
+
+		assertEquals(
+			100,
+			tableEnv.getConfig().getConfiguration().getInteger(
+				ExecutionConfigOptions.TABLE_EXEC_SORT_DEFAULT_LIMIT));
+		assertTrue(
+			tableEnv.getConfig().getConfiguration().getBoolean(
+				ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED));
+		assertEquals(
+			"128kb",
+			tableEnv.getConfig().getConfiguration().getString(
+				ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE));
+
+		assertTrue(
+			tableEnv.getConfig().getConfiguration().getBoolean(
+				OptimizerConfigOptions.TABLE_OPTIMIZER_JOIN_REORDER_ENABLED));
+
+		// these options are not modified and should be equal to their default value
+		assertEquals(
+			ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED.defaultValue(),
+			tableEnv.getConfig().getConfiguration().getBoolean(
+				ExecutionConfigOptions.TABLE_EXEC_SORT_ASYNC_MERGE_ENABLED));
+		assertEquals(
+			ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE.defaultValue(),
+			tableEnv.getConfig().getConfiguration().getString(
+				ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE));
+		assertEquals(
+			OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD.defaultValue().longValue(),
+			tableEnv.getConfig().getConfiguration().getLong(
+				OptimizerConfigOptions.TABLE_OPTIMIZER_BROADCAST_JOIN_THRESHOLD));
+	}
+
 	private <T> ExecutionContext<T> createExecutionContext(String file, Map<String, String> replaceVars) throws Exception {
 		final Environment env = EnvironmentFileUtil.parseModified(
 			file,
@@ -272,4 +311,8 @@ public class ExecutionContextTest {
 		replaceVars.put("$VAR_CONNECTOR_PROPERTY_VALUE", "");
 		return createExecutionContext(STREAMING_ENVIRONMENT_FILE, replaceVars);
 	}
+
+	private <T> ExecutionContext<T> createConfigurationExecutionContext() throws Exception {
+		return createExecutionContext(CONFIGURATION_ENVIRONMENT_FILE, new HashMap<>());
+	}
 }
diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 838b95a..963314b 100644
--- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -250,6 +250,7 @@ public class LocalExecutorITCase extends TestLogger {
 		expectedProperties.put("execution.restart-strategy.max-failures-per-interval", "10");
 		expectedProperties.put("execution.restart-strategy.failure-rate-interval", "99000");
 		expectedProperties.put("execution.restart-strategy.delay", "1000");
+		expectedProperties.put("table.optimizer.join-reorder-enabled", "false");
 		expectedProperties.put("deployment.response-timeout", "5000");
 
 		assertEquals(expectedProperties, actualProperties);
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
similarity index 59%
copy from flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
copy to flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
index de60538..2e1b324 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-configuration.yaml
@@ -18,39 +18,16 @@
 
 #==============================================================================
 # TEST ENVIRONMENT FILE
-# Intended for org.apache.flink.table.client.gateway.local.DependencyTest.
+# This test file is to check whether the configuration can be successfully set.
 #==============================================================================
 
-# this file has variables that can be filled with content by replacing $VAR_XXX
-
-tables:
-  - name: TableNumber1
-    type: source-sink-table
-    update-mode: append
-    schema:
-      - name: IntegerField1
-        type: INT
-      - name: StringField1
-        type: VARCHAR
-      - name: rowtimeField
-        type: TIMESTAMP
-        rowtime:
-          timestamps:
-            type: from-field
-            from: rowtimeField
-          watermarks:
-            type: from-source
-    connector:
-      type: "$VAR_CONNECTOR_TYPE"
-      $VAR_CONNECTOR_PROPERTY: "$VAR_CONNECTOR_PROPERTY_VALUE"
-
 execution:
-  type: streaming
-  parallelism: 1
-
-deployment:
-  response-timeout: 5000
+  planner: blink
+  type: batch
+  result-mode: table
 
-catalogs:
-  - name: catalog2
-    type: DependencyTest
+configuration:
+  table.exec.sort.default-limit: 100
+  table.exec.spill-compression.enabled: true
+  table.exec.spill-compression.block-size: 128kb
+  table.optimizer.join-reorder-enabled: true
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
index d3e917d..9b6b50c 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-defaults.yaml
@@ -139,5 +139,8 @@ execution:
     failure-rate-interval: 99000
     delay: 1000
 
+configuration:
+  table.optimizer.join-reorder-enabled: false
+
 deployment:
   response-timeout: 5000
diff --git a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
index de60538..6c6ec14 100644
--- a/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
+++ b/flink-table/flink-sql-client/src/test/resources/test-sql-client-factory.yaml
@@ -48,6 +48,9 @@ execution:
   type: streaming
   parallelism: 1
 
+configuration:
+  table.optimizer.join-reorder-enabled: true
+
 deployment:
   response-timeout: 5000
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 81b464d..8d14dd6 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -27,6 +27,8 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  * This class holds configuration constants used by Flink's table module.
  *
  * <p>This is only used for the Blink planner.
+ *
+ * <p>NOTE: All option keys in this class must start with "table.exec".
  */
 public class ExecutionConfigOptions {
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
index c9ad6ef..2256954 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/OptimizerConfigOptions.java
@@ -27,6 +27,8 @@ import static org.apache.flink.configuration.ConfigOptions.key;
  * This class holds configuration constants used by Flink's table planner module.
  *
  * <p>This is only used for the Blink planner.
+ *
+ * <p>NOTE: All option keys in this class must start with "table.optimizer".
  */
 public class OptimizerConfigOptions {