You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2022/03/02 09:58:35 UTC

[flink] branch master updated: [FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 92eb824  [FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references
92eb824 is described below

commit 92eb824ba1e3ac4a6ee28ab4a68e508766236dab
Author: slinkydeveloper <fr...@gmail.com>
AuthorDate: Tue Feb 22 17:00:42 2022 +0100

    [FLINK-26434][table] Remove 'table.planner' config option and remaining Blink references
    
    This closes #18886.
---
 flink-python/pyflink/table/environment_settings.py | 73 ----------------------
 .../client/gateway/context/ExecutionContext.java   |  5 --
 .../flink/table/api/EnvironmentSettings.java       | 70 ---------------------
 .../org/apache/flink/table/api/PlannerType.java    | 40 ------------
 .../flink/table/api/config/TableConfigOptions.java | 12 ----
 .../table/planner/delegation/PlannerBase.scala     |  7 ---
 .../flink/table/api/TableEnvironmentITCase.scala   | 19 ------
 7 files changed, 226 deletions(-)

diff --git a/flink-python/pyflink/table/environment_settings.py b/flink-python/pyflink/table/environment_settings.py
index f4b8d32..578567d 100644
--- a/flink-python/pyflink/table/environment_settings.py
+++ b/flink-python/pyflink/table/environment_settings.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import warnings
 from pyflink.java_gateway import get_gateway
 
 from pyflink.common import Configuration
@@ -51,56 +50,6 @@ class EnvironmentSettings(object):
             gateway = get_gateway()
             self._j_builder = gateway.jvm.EnvironmentSettings.Builder()
 
-        def use_old_planner(self) -> 'EnvironmentSettings.Builder':
-            """
-            .. note:: The old planner has been removed in Flink 1.14. Since there is only one
-                      planner left (previously called the 'blink' planner), this setting will
-                      throw an exception.
-            """
-            warnings.warn(
-                "Deprecated in 1.13. Please update to the new planner (i.e. Blink planner).",
-                DeprecationWarning)
-            self._j_builder = self._j_builder.useOldPlanner()
-            return self
-
-        def use_blink_planner(self) -> 'EnvironmentSettings.Builder':
-            """
-            Sets the Blink planner as the required module.
-
-            This is the default behavior.
-
-            .. note:: The old planner has been removed in Flink 1.14. Since there is only one
-                      planner left (previously called the 'blink' planner), this setting is
-                      obsolete and will be removed in future versions.
-
-            :return: This object.
-            """
-            warnings.warn(
-                "Deprecated in 1.14. A planner declaration is not required anymore.",
-                DeprecationWarning)
-            self._j_builder = self._j_builder.useBlinkPlanner()
-            return self
-
-        def use_any_planner(self) -> 'EnvironmentSettings.Builder':
-            """
-            Does not set a planner requirement explicitly.
-
-            A planner will be discovered automatically, if there is only one planner available.
-
-            By default, :func:`use_blink_planner` is enabled.
-
-            .. note:: The old planner has been removed in Flink 1.14. Since there is only one
-                      planner left (previously called the 'blink' planner), this setting is
-                      obsolete and will be removed in future versions.
-
-            :return: This object.
-            """
-            warnings.warn(
-                "Deprecated in 1.14. A planner declaration is not required anymore.",
-                DeprecationWarning)
-            self._j_builder = self._j_builder.useAnyPlanner()
-            return self
-
         def in_batch_mode(self) -> 'EnvironmentSettings.Builder':
             """
             Sets that the components should work in a batch mode. Streaming mode by default.
@@ -192,22 +141,6 @@ class EnvironmentSettings(object):
         """
         return self._j_environment_settings.getBuiltInDatabaseName()
 
-    def is_blink_planner(self) -> bool:
-        """
-        Tells if :class:`~pyflink.table.TableEnvironment` should work in a blink or old
-        planner.
-
-        .. note:: The old planner has been removed in Flink 1.14. Since there is only one
-                  planner left (previously called the 'blink' planner), this method is
-                  obsolete and will be removed in future versions.
-
-        :return: True if the TableEnvironment should work in a blink planner, false otherwise.
-        """
-        warnings.warn(
-            "Deprecated in 1.14. There is only one planner anymore.",
-            DeprecationWarning)
-        return self._j_environment_settings.isBlinkPlanner()
-
     def is_streaming_mode(self) -> bool:
         """
         Tells if the :class:`~pyflink.table.TableEnvironment` should work in a batch or streaming
@@ -221,9 +154,6 @@ class EnvironmentSettings(object):
         """
         Convert to `pyflink.common.Configuration`.
 
-        It sets the `table.planner` and `execution.runtime-mode` according to the current
-        EnvironmentSetting.
-
         :return: Configuration with specified value.
         """
         return Configuration(j_configuration=self._j_environment_settings.toConfiguration())
@@ -233,9 +163,6 @@ class EnvironmentSettings(object):
         """
         Creates a builder for creating an instance of EnvironmentSettings.
 
-        By default, it does not specify a required planner and will use the one that is available
-        on the classpath via discovery.
-
         :return: A builder of EnvironmentSettings.
         """
         return EnvironmentSettings.Builder()
diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index 3805499..135cf8f 100644
--- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -101,11 +101,6 @@ public class ExecutionContext {
         // checks the value of RUNTIME_MODE
         EnvironmentSettings settings = EnvironmentSettings.fromConfiguration(flinkConfig);
 
-        if (!settings.isBlinkPlanner()) {
-            throw new TableException(
-                    "The old planner is not supported anymore. Please update to new default planner.");
-        }
-
         TableConfig tableConfig = new TableConfig();
         tableConfig.addConfiguration(flinkConfig);
 
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
index ade8fac..cc9a32b 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java
@@ -33,7 +33,6 @@ import javax.annotation.Nullable;
 import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH;
 import static org.apache.flink.api.common.RuntimeExecutionMode.STREAMING;
 import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE;
-import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_PLANNER;
 
 /**
  * Defines all parameters that initialize a table environment. Those parameters are used only during
@@ -150,19 +149,6 @@ public class EnvironmentSettings {
                                 configuration.get(RUNTIME_MODE), RUNTIME_MODE.key()));
         }
 
-        switch (configuration.get(TABLE_PLANNER)) {
-            case BLINK:
-                builder.useBlinkPlanner();
-                break;
-            case OLD:
-                builder.useOldPlanner();
-                break;
-            default:
-                throw new IllegalArgumentException(
-                        String.format(
-                                "Unrecognized value '%s' for option '%s'.",
-                                configuration.get(TABLE_PLANNER), TABLE_PLANNER.key()));
-        }
         return builder.build();
     }
 
@@ -170,7 +156,6 @@ public class EnvironmentSettings {
     public Configuration toConfiguration() {
         Configuration configuration = new Configuration();
         configuration.set(RUNTIME_MODE, isStreamingMode() ? STREAMING : BATCH);
-        configuration.set(TABLE_PLANNER, PlannerType.BLINK);
         return configuration;
     }
 
@@ -195,18 +180,6 @@ public class EnvironmentSettings {
         return isStreamingMode;
     }
 
-    /**
-     * Tells if the {@link TableEnvironment} should work in the blink planner or old planner.
-     *
-     * @deprecated The old planner has been removed in Flink 1.14. Since there is only one planner
-     *     left (previously called the 'blink' planner), this method is obsolete and will be removed
-     *     in future versions.
-     */
-    @Deprecated
-    public boolean isBlinkPlanner() {
-        return true;
-    }
-
     /** Returns the identifier of the {@link Planner} to be used. */
     @Internal
     public String getPlanner() {
@@ -229,49 +202,6 @@ public class EnvironmentSettings {
         private String builtInDatabaseName = DEFAULT_BUILTIN_DATABASE;
         private boolean isStreamingMode = true;
 
-        /**
-         * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
-         *     planner left (previously called the 'blink' planner), this setting will throw an
-         *     exception.
-         */
-        @Deprecated
-        public Builder useOldPlanner() {
-            throw new TableException(
-                    "The old planner has been removed in Flink 1.14. "
-                            + "Please upgrade your table program to use the default "
-                            + "planner (previously called the 'blink' planner).");
-        }
-
-        /**
-         * Sets the Blink planner as the required module.
-         *
-         * <p>This is the default behavior.
-         *
-         * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
-         *     planner left (previously called the 'blink' planner), this setting is obsolete and
-         *     will be removed in future versions.
-         */
-        @Deprecated
-        public Builder useBlinkPlanner() {
-            return this;
-        }
-
-        /**
-         * Does not set a planner requirement explicitly.
-         *
-         * <p>A planner will be discovered automatically, if there is only one planner available.
-         *
-         * <p>By default, {@link #useBlinkPlanner()} is enabled.
-         *
-         * @deprecated The old planner has been removed in Flink 1.14. Since there is only one
-         *     planner left (previously called the 'blink' planner), this setting is obsolete and
-         *     will be removed in future versions.
-         */
-        @Deprecated
-        public Builder useAnyPlanner() {
-            return this;
-        }
-
         /** Sets that the components should work in a batch mode. Streaming mode by default. */
         public Builder inBatchMode() {
             this.isStreamingMode = false;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java
deleted file mode 100644
index 3fa84fd..0000000
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PlannerType.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.api;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.table.delegation.Planner;
-
-/**
- * Determine the type of the {@link Planner}. Except for the optimization, the different planner
- * also differs in the time semantic and so on.
- *
- * @deprecated The old planner has been removed in Flink 1.14. Since there is only one planner left
- *     (previously called the 'blink' planner), this class is obsolete and will be removed in future
- *     versions.
- */
-@PublicEvolving
-@Deprecated
-public enum PlannerType {
-    /** Blink planner is the up-to-date planner in Flink. */
-    BLINK,
-
-    /** Old planner is used before. It will not be maintained in the future. */
-    OLD
-}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
index 72c89f8..8601f38 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/TableConfigOptions.java
@@ -25,7 +25,6 @@ import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
-import org.apache.flink.table.api.PlannerType;
 import org.apache.flink.table.api.SqlDialect;
 import org.apache.flink.table.catalog.Catalog;
 
@@ -42,17 +41,6 @@ public class TableConfigOptions {
     private TableConfigOptions() {}
 
     @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
-    @Deprecated
-    public static final ConfigOption<PlannerType> TABLE_PLANNER =
-            key("table.planner")
-                    .enumType(PlannerType.class)
-                    .defaultValue(PlannerType.BLINK)
-                    .withDescription(
-                            "The old planner has been removed in Flink 1.14. "
-                                    + "Since there is only one planner left (previously called the 'blink' planner), "
-                                    + "this option is obsolete and will be removed in future versions.");
-
-    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
     public static final ConfigOption<Boolean> TABLE_DML_SYNC =
             key("table.dml-sync")
                     .booleanType()
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 9194de1..8ea4b20 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -483,13 +483,6 @@ abstract class PlannerBase(
    */
   protected def validateAndOverrideConfiguration(): Unit = {
     val configuration = tableConfig.getConfiguration
-    if (!configuration.get(TableConfigOptions.TABLE_PLANNER).equals(PlannerType.BLINK)) {
-      throw new IllegalArgumentException(
-        "Mismatch between configured planner and actual planner. " +
-          "Currently, the 'table.planner' can only be set when instantiating the " +
-          "table environment. Subsequent changes are not supported. " +
-          "Please instantiate a new TableEnvironment if necessary.");
-    }
 
     // Add query start time to TableConfig, these config are used internally,
     // these configs will be used by temporal functions like CURRENT_TIMESTAMP,LOCALTIMESTAMP.
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
index 25048d8..ffec38f 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentITCase.scala
@@ -85,25 +85,6 @@ class TableEnvironmentITCase(tableEnvName: String, isStreaming: Boolean) extends
   }
 
   @Test
-  def testSetPlannerType: Unit = {
-    tEnv.getConfig.set(TableConfigOptions.TABLE_PLANNER, PlannerType.OLD)
-
-    TestTableSourceSinks.createCsvTemporarySinkTable(
-      tEnv, new TableSchema(Array("first"), Array(STRING)), "MySink1")
-
-
-    thrown.expect(classOf[IllegalArgumentException])
-    thrown.expectMessage(
-      "Mismatch between configured planner and actual planner. " +
-        "Currently, the 'table.planner' can only be set " +
-        "when instantiating the table environment. Subsequent changes are not supported. " +
-        "Please instantiate a new TableEnvironment if necessary."
-    )
-
-    tEnv.executeSql("insert into MySink1 select first from MyTable")
-  }
-
-  @Test
   def testSetExecutionMode(): Unit = {
     if (isStreaming) {
       tEnv.getConfig.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH)