You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/02 09:58:35 UTC
[flink] branch master updated: [FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 92eb824 [FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references
92eb824 is described below
commit 92eb824ba1e3ac4a6ee28ab4a68e508766236dab
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Feb 22 17:00:42 2022 +0100
[FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references
This closes #18886.
---
flink-python/pyflink/table/environment_settings.py | 73 ----------------------
.../client/gateway/context/ExecutionContext.java | 5 --
.../flink/table/api/EnvironmentSettings.java | 70 ---------------------
.../org/apache/flink/table/api/PlannerType.java | 40 ------------
.../flink/table/api/config/TableConfigOptions.java | 12 ----
.../table/planner/delegation/PlannerBase.scala | 7 ---
.../flink/table/api/TableEnvironmentITCase.scala | 19 ------
7 files changed, 226 deletions(-)
diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py
index f4b8d32..578567d 100644
--- a/flink-python/pyflink/table/environment_settings.py
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-import warnings
from pyflink.java_gateway import get_gateway
from pyflink.common import Configuration
@@ -51,56 +50,6 @@ class EnvironmentSettings(object):
gateway = get_gateway()
self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
- def use_old_planner(self) -> 'EnvironmentSettings.Builder':
- """
- .. note:: The old planner has been removed in Flink 1.14. Since there is only one
- planner left (previously called the 'blink' planner), this setting will
- throw an exception.
- """
- warnings.warn(
- "Deprecated in 1.13. Please update to the new planner (i.e. Blink planner).",
- DeprecationWarning)
- self._j_builder = self._j_builder.useOldPlanner()
- return self
-
- def use_blink_planner(self) -> 'EnvironmentSettings.Builder':
- """
- Sets the Blink planner as the required module.
-
- This is the default behavior.
-
- .. note:: The old planner has been removed in Flink 1.14. Since there is only one
- planner left (previously called the 'blink' planner), this setting is
- obsolete and will be removed in future versions.
-
- :return: This object.
- """
- warnings.warn(
- "Deprecated in 1.14. A planner declaration is not required anymore.",
- DeprecationWarning)
- self._j_builder = self._j_builder.useBlinkPlanner()
- return self
-
- def use_any_planner(self) -> 'EnvironmentSettings.Builder':
- """
- Does not set a planner requirement explicitly.
-
- A planner will be discovered automatically, if there is only one planner available.
-
- By default, :func:`use_blink_planner` is enabled.
-
- .. note:: The old planner has been removed in Flink 1.14. Since there is only one
- planner left (previously called the 'blink' planner), this setting is
- obsolete and will be removed in future versions.
-
- :return: This object.
- """
- warnings.warn(
- "Deprecated in 1.14. A planner declaration is not required anymore.",
- DeprecationWarning)
- self._j_builder = self._j_builder.useAnyPlanner()
- return self
-
def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
"""
Sets that the components should work in a batch mode. Streaming mode by default.
@@ -192,22 +141,6 @@ class EnvironmentSettings(object):
"""
return self._j_environment_settings.getBuiltInDatabaseName()
- def is_blink_planner(self) -> bool:
- """
- Tells if :class:`~pyflink.table.TableEnvironment` should work in a blink or old
- planner.
-
- .. note:: The old planner has been removed in Flink 1.14. Since there is only one
- planner left (previously called the 'blink' planner), this method is
- obsolete and will be removed in future versions.
-
- :return: True if the TableEnvironment should work in a blink planner, false otherwise.
- """
- warnings.warn(
- "Deprecated in 1.14. There is only one planner anymore.",
- DeprecationWarning)
- return self._j_environment_settings.isBlinkPlanner()
-
def is_streaming_mode(self) -> bool:
"""
Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
@@ -221,9 +154,6 @@ class EnvironmentSettings(object):
"""
Convert to `pyflink.common.Configuration`.
- It sets the `table.planner` and `execution.runtime-mode` according to the current
- EnvironmentSetting.
-
:return: Configuration with specified value.
"""
return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
@@ -233,9 +163,6 @@ class EnvironmentSettings(object):
"""
Creates a builder for creating an instance of EnvironmentSettings.
- By default, it does not specify a required planner and will use the one that is available
- on the classpath via discovery.
-
:return: A builder of EnvironmentSettings.
"""
return EnvironmentSettings.Builder()
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 3805499..135cf8f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -101,11 +101,6 @@ public class ExecutionContext {
// checks the value of RUNTIME_MODE
EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig);
- if (!settings.isBlinkPlanner()) {
- throw new TableException(
- "The old planner is not supported anymore. Please update to new default planner.");
- }
-
TableConfig tableConfig = new TableConfig();
tableConfig.addConfiguration(flinkConfig);
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index ade8fac..cc9a32b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
-import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;
/**
* Defines all parameters that initialize a table environment. Those parameters are used only during
@@ -150,19 +149,6 @@ public class EnvironmentSettings {
configuration.get(RUNTIME_MODE), RUNTIME_MODE.key()));
}
- switch (configuration.get(TABLE_PLANNER)) {
- case BLINK:
- builder.useBlinkPlanner();
- break;
- case OLD:
- builder.useOldPlanner();
- break;
- default:
- throw new IllegalArgumentException(
- String.format(
- "Unrecognized value '%s' for option '%s'.",
- configuration.get(TABLE_PLANNER), TABLE_PLANNER.key()));
- }
return builder.build();
}
@@ -170,7 +156,6 @@ public class EnvironmentSettings {
public Configuration toConfiguration() {
Configuration configuration = new Configuration();
configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH);
- configuration.set(TABLE_PLANNER, PlannerType.BLINK);
return configuration;
}
@@ -195,18 +180,6 @@ public class EnvironmentSettings {
return isStreamingMode;
}
- /**
- * Tells if the {@link TableEnvironment} should work in the blink planner or old planner.
- *
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one planner
- * left (previously called the 'blink' planner), this method is obsolete and will be removed
- * in future versions.
- */
- @Deprecated
- public boolean isBlinkPlanner() {
- return true;
- }
-
/** Returns the identifier of the {@link Planner} to be used. */
@Internal
public String getPlanner() {
@@ -229,49 +202,6 @@ public class EnvironmentSettings {
private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
private boolean isStreamingMode = true;
- /**
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
- * planner left (previously called the 'blink' planner), this setting will throw an
- * exception.
- */
- @Deprecated
- public Builder useOldPlanner() {
- throw new TableException(
- "The old planner has been removed in Flink 1.14. "
- + "Please upgrade your table program to use the default "
- + "planner (previously called the 'blink' planner).");
- }
-
- /**
- * Sets the Blink planner as the required module.
- *
- * <p>This is the default behavior.
- *
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
- * planner left (previously called the 'blink' planner), this setting is obsolete and
- * will be removed in future versions.
- */
- @Deprecated
- public Builder useBlinkPlanner() {
- return this;
- }
-
- /**
- * Does not set a planner requirement explicitly.
- *
- * <p>A planner will be discovered automatically, if there is only one planner available.
- *
- * <p>By default, {@link #useBlinkPlanner()} is enabled.
- *
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
- * planner left (previously called the 'blink' planner), this setting is obsolete and
- * will be removed in future versions.
- */
- @Deprecated
- public Builder useAnyPlanner() {
- return this;
- }
-
/** Sets that the components should work in a batch mode. Streaming mode by default. */
public Builder inBatchMode() {
this.isStreamingMode = false;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java
deleted file mode 100644
index 3fa84fd..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.delegation.Planner;
-
-/**
- * Determine the type of the {@link Planner}. Except for the optimization, the different planner
- * also differs in the time semantic and so on.
- *
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one planner left
- * (previously called the 'blink' planner), this class is obsolete and will be removed in future
- * versions.
- */
-@PublicEvolving
-@Deprecated
-public enum PlannerType {
- /** Blink planner is the up-to-date planner in Flink. */
- BLINK,
-
- /** Old planner is used before. It will not be maintained in the future. */
- OLD
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 72c89f8..8601f38 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.DescribedEnum;
import org.apache.flink.configuration.description.Description;
import org.apache.flink.configuration.description.InlineElement;
-import org.apache.flink.table.api.PlannerType;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.catalog.Catalog;
@@ -42,17 +41,6 @@ public class TableConfigOptions {
private TableConfigOptions() {}
@Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
- @Deprecated
- public static final ConfigOption<PlannerType> TABLE_PLANNER =
- key("table.planner")
- .enumType(PlannerType.class)
- .defaultValue(PlannerType.BLINK)
- .withDescription(
- "The old planner has been removed in Flink 1.14. "
- + "Since there is only one planner left (previously called the 'blink' planner), "
- + "this option is obsolete and will be removed in future versions.");
-
- @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_DML_SYNC =
key("table.dml-sync")
.booleanType()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 9194de1..8ea4b20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -483,13 +483,6 @@ abstract class PlannerBase(
*/
protected def validateAndOverrideConfiguration(): Unit = {
val configuration = tableConfig.getConfiguration
- if (!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) {
- throw new IllegalArgumentException(
- "Mismatch between configured planner and actual planner. " +
- "Currently, the 'table.planner' can only be set when instantiating the " +
- "table environment. Subsequent changes are not supported. " +
- "Please instantiate a new TableEnvironment if necessary.");
- }
// Add query start time to TableConfig, these config are used internally,
// these configs will be used by temporal functions like CURRENT_TIMESTAMP,LOCALTIMESTAMP.
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 25048d8..ffec38f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -85,25 +85,6 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
}
@Test
- def testSetPlannerType: Unit = {
- tEnv.getConfig.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
-
- TestTableSourceSinks.createCsvTemporarySinkTable(
- tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink1")
-
-
- thrown.expect(classOf[IllegalArgumentException])
- thrown.expectMessage(
- "Mismatch between configured planner and actual planner. " +
- "Currently, the 'table.planner' can only be set " +
- "when instantiating the table environment. Subsequent changes are not supported. " +
- "Please instantiate a new TableEnvironment if necessary."
- )
-
- tEnv.executeSql("insert into MySink1 select first from MyTable")
- }
-
- @Test
def testSetExecutionMode(): Unit = {
if (isStreaming) {
tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)