You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/26 08:45:30 UTC

[flink] branch master updated: [FLINK-28628][table] Introduce Operation Execution Plugin (#20247)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c7af66be7cb [FLINK-28628][table] Introduce Operation Execution Plugin (#20247)
c7af66be7cb is described below

commit c7af66be7cb5c8148dd8bd258267bb7d2550f10e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Jul 26 16:45:20 2022 +0800

    [FLINK-28628][table] Introduce Operation Execution Plugin (#20247)
---
 ...eParserFactory.java => HiveDialectFactory.java} | 10 ++++--
 .../delegation/hive/HiveOperationExecutor.java     | 36 +++++++++++++++++++
 .../org.apache.flink.table.factories.Factory       |  2 +-
 .../flink/connectors/hive/HiveDialectITCase.java   | 19 ++++++++--
 .../table/api/internal/TableEnvironmentImpl.java   | 16 ++++++++-
 .../delegation/ExtendedOperationExecutor.java      | 42 ++++++++++++++++++++++
 .../org/apache/flink/table/delegation/Planner.java |  8 +++++
 .../org/apache/flink/table/utils/PlannerMock.java  |  7 ++++
 ...rserFactory.java => DefaultDialectFactory.java} |  2 +-
 .../{ParserFactory.java => DialectFactory.java}    | 39 ++++++++++++++++++--
 .../org.apache.flink.table.factories.Factory       |  2 +-
 .../table/planner/delegation/PlannerBase.scala     | 41 ++++++++++++++-------
 12 files changed, 200 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
similarity index 83%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
index 92dce590d5b..10b8dce8f9a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
@@ -20,14 +20,15 @@ package org.apache.flink.table.planner.delegation.hive;
 
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.planner.delegation.ParserFactory;
+import org.apache.flink.table.planner.delegation.DialectFactory;
 
 import java.util.Collections;
 import java.util.Set;
 
 /** A Parser factory that creates {@link HiveParser}. */
-public class HiveParserFactory implements ParserFactory {
+public class HiveDialectFactory implements DialectFactory {
 
     @Override
     public String factoryIdentifier() {
@@ -52,4 +53,9 @@ public class HiveParserFactory implements ParserFactory {
                 context.getPlannerContext()::createCalciteParser,
                 context.getPlannerContext());
     }
+
+    @Override
+    public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
+        return new HiveOperationExecutor();
+    }
 }
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
new file mode 100644
index 00000000000..2eb02ba27a6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
+
+/**
+ * A Hive's operation executor used to execute operation in custom way instead of Flink's
+ * implementation.
+ */
+public class HiveOperationExecutor implements ExtendedOperationExecutor {
+    @Override
+    public Optional<TableResultInternal> executeOperation(Operation operation) {
+        return Optional.empty();
+    }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 929abe11d6e..dac2f67e247 100644
--- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
 org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
 org.apache.flink.table.endpoint.hive.HiveServer2EndpointFactory
 org.apache.flink.table.module.hive.HiveModuleFactory
-org.apache.flink.table.planner.delegation.hive.HiveParserFactory
+org.apache.flink.table.planner.delegation.hive.HiveDialectFactory
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index a1e868ab111..a861cde8114 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.hive.HiveCatalog;
 import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.operations.DescribeTableOperation;
 import org.apache.flink.table.operations.command.ClearOperation;
@@ -41,6 +43,7 @@ import org.apache.flink.table.operations.command.HelpOperation;
 import org.apache.flink.table.operations.command.QuitOperation;
 import org.apache.flink.table.operations.command.ResetOperation;
 import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.planner.delegation.hive.HiveOperationExecutor;
 import org.apache.flink.table.planner.delegation.hive.HiveParser;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 import org.apache.flink.types.Row;
@@ -114,18 +117,30 @@ public class HiveDialectITCase {
     }
 
     @Test
-    public void testPluggableParser() {
+    public void testPluggableDialect() {
         TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
         Parser parser = tableEnvInternal.getParser();
         // hive dialect should use HiveParser
         assertThat(parser).isInstanceOf(HiveParser.class);
-        // execute some sql and verify the parser instance is reused
+        ExtendedOperationExecutor operationExecutor =
+                ((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor();
+        // hive dialect should use HiveOperationExecutor
+        assertThat(operationExecutor).isInstanceOf(HiveOperationExecutor.class);
+        // execute some sql and verify the parser/operation executor instance is reused
         tableEnvInternal.executeSql("show databases");
         assertThat(tableEnvInternal.getParser()).isSameAs(parser);
+        assertThat(((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor())
+                .isSameAs(operationExecutor);
         // switching dialect will result in a new parser
         tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT);
         assertThat(tableEnvInternal.getParser().getClass().getName())
                 .isNotEqualTo(parser.getClass().getName());
+        assertThat(
+                        ((TableEnvironmentImpl) tableEnvInternal)
+                                .getExtendedOperationExecutor()
+                                .getClass()
+                                .getName())
+                .isNotEqualTo(operationExecutor.getClass().getName());
     }
 
     @Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 68b97a0d279..d4e10108d38 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -73,6 +73,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.InternalPlan;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
@@ -703,7 +704,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
             throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
         }
 
-        return executeInternal(operations.get(0));
+        Operation operation = operations.get(0);
+        return executeInternal(operation);
     }
 
     @Override
@@ -872,6 +874,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
 
     @Override
     public TableResultInternal executeInternal(Operation operation) {
+        // try to use extended operation executor to execute the operation
+        Optional<TableResultInternal> tableResult =
+                getExtendedOperationExecutor().executeOperation(operation);
+        // if the extended operation executor return non-empty result, return it
+        if (tableResult.isPresent()) {
+            return tableResult.get();
+        }
+        // otherwise, fall back to internal implementation
         if (operation instanceof ModifyOperation) {
             return executeInternal(Collections.singletonList((ModifyOperation) operation));
         } else if (operation instanceof StatementSetOperation) {
@@ -1639,6 +1649,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
         return getPlanner().getParser();
     }
 
+    public ExtendedOperationExecutor getExtendedOperationExecutor() {
+        return getPlanner().getExtendedOperationExecutor();
+    }
+
     @Override
     public CatalogManager getCatalogManager() {
         return catalogManager;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java
new file mode 100644
index 00000000000..a3a0b2189c3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
+
+/**
+ * An extended operation executor which provides method for executing operation. External pluggable
+ * dialect can implement this interface to execute operation in its own way instead of using Flink's
+ * own implementation for operation execution.
+ */
+@Internal
+public interface ExtendedOperationExecutor {
+
+    /**
+     * Execute the given operation and return the execution result. This method will delegate
+     * Flink's own operation execution.
+     *
+     * <p>If return Optional.empty(), the operation will then fall to Flink's operation execution.
+     */
+    Optional<TableResultInternal> executeOperation(Operation operation);
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index b9e33df8bf7..ee969a19cc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -62,6 +62,14 @@ public interface Planner {
      */
     Parser getParser();
 
+    /**
+     * Retrieves a {@link ExtendedOperationExecutor} that provides method for executing operation in
+     * a custom way.
+     *
+     * @return initialized {@link ExtendedOperationExecutor}
+     */
+    ExtendedOperationExecutor getExtendedOperationExecutor();
+
     /**
      * Converts a relational tree of {@link ModifyOperation}s into a set of runnable {@link
      * Transformation}s.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index a8f45523458..c45a184eb59 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.utils;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.table.api.ExplainDetail;
 import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.InternalPlan;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
@@ -29,6 +30,7 @@ import org.apache.flink.table.operations.Operation;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 
 /** Mocking {@link Planner} for tests. */
 public class PlannerMock implements Planner {
@@ -38,6 +40,11 @@ public class PlannerMock implements Planner {
         return new ParserMock();
     }
 
+    @Override
+    public ExtendedOperationExecutor getExtendedOperationExecutor() {
+        return (operation) -> Optional.empty();
+    }
+
     @Override
     public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
         return null;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
index e75efbe3432..512a5f428e8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
@@ -26,7 +26,7 @@ import java.util.Collections;
 import java.util.Set;
 
 /** A Parser factory that creates {@link ParserImpl}. */
-public class DefaultParserFactory implements ParserFactory {
+public class DefaultDialectFactory implements DialectFactory {
 
     @Override
     public String factoryIdentifier() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
similarity index 59%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
index 9c1ef0cd0e1..6deebab3dce 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
@@ -20,37 +20,52 @@ package org.apache.flink.table.planner.delegation;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.internal.TableResultInternal;
 import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
 import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
 
 /**
- * Factory that creates {@link Parser}.
+ * Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
  *
  * <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
  * TableConfigOptions#TABLE_SQL_DIALECT}.
  */
 @Internal
-public interface ParserFactory extends Factory {
+public interface DialectFactory extends Factory {
 
     /** Creates a new parser. */
     Parser create(Context context);
 
+    default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
+        return new EmptyOperationExecutor();
+    }
+
     /** Context provided when a parser is created. */
     interface Context {
         CatalogManager getCatalogManager();
 
         PlannerContext getPlannerContext();
+
+        Executor getExecutor();
     }
 
     /** Default implementation for {@link Context}. */
     class DefaultParserContext implements Context {
         private final CatalogManager catalogManager;
         private final PlannerContext plannerContext;
+        private final Executor executor;
 
-        public DefaultParserContext(CatalogManager catalogManager, PlannerContext plannerContext) {
+        public DefaultParserContext(
+                CatalogManager catalogManager, PlannerContext plannerContext, Executor executor) {
             this.catalogManager = catalogManager;
             this.plannerContext = plannerContext;
+            this.executor = executor;
         }
 
         @Override
@@ -62,5 +77,23 @@ public interface ParserFactory extends Factory {
         public PlannerContext getPlannerContext() {
             return plannerContext;
         }
+
+        @Override
+        public Executor getExecutor() {
+            return executor;
+        }
+    }
+
+    /**
+     * Default implementation for {@link ExtendedOperationExecutor} that doesn't extend any
+     * operation behavior but forward all operations to the Flink planner.
+     */
+    class EmptyOperationExecutor implements ExtendedOperationExecutor {
+
+        @Override
+        public Optional<TableResultInternal> executeOperation(Operation operation) {
+            // return empty so that it'll use Flink's own implementation for operation execution.
+            return Optional.empty();
+        }
     }
 }
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index ac5f9b6bb17..6121bb963fc 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,5 @@
 # limitations under the License.
 
 org.apache.flink.table.planner.delegation.DefaultExecutorFactory
-org.apache.flink.table.planner.delegation.DefaultParserFactory
+org.apache.flink.table.planner.delegation.DefaultDialectFactory
 org.apache.flink.table.planner.delegation.DefaultPlannerFactory
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 1c8c6b7b2ff..8d00017f46e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.catalog._
 import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
-import org.apache.flink.table.delegation.{Executor, Parser, Planner}
+import org.apache.flink.table.delegation.{Executor, ExtendedOperationExecutor, Parser, Planner}
 import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil}
 import org.apache.flink.table.module.{Module, ModuleManager}
 import org.apache.flink.table.operations._
@@ -36,7 +36,6 @@ import org.apache.flink.table.planner.calcite._
 import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils
 import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
-import org.apache.flink.table.planner.delegation.ParserFactory.DefaultParserContext
 import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
 import org.apache.flink.table.planner.hint.FlinkHints
 import org.apache.flink.table.planner.operations.PlannerQueryOperation
@@ -56,6 +55,7 @@ import org.apache.flink.table.sinks.TableSink
 import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
 
 import _root_.scala.collection.JavaConversions._
+import DialectFactory.DefaultParserContext
 import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
 import org.apache.calcite.plan.{RelTrait, RelTraitDef}
 import org.apache.calcite.rel.RelNode
@@ -100,7 +100,9 @@ abstract class PlannerBase(
   // temporary utility until we don't use planner expressions anymore
   functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
 
+  private var dialectFactory: DialectFactory = _
   private var parser: Parser = _
+  private var extendedOperationExecutor: ExtendedOperationExecutor = _
   private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
 
   @VisibleForTesting
@@ -147,25 +149,38 @@ abstract class PlannerBase(
     executor.asInstanceOf[DefaultExecutor].getExecutionEnvironment
   }
 
-  def createNewParser: Parser = {
-    val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase
-    val parserFactory = FactoryUtil.discoverFactory(
-      getClass.getClassLoader,
-      classOf[ParserFactory],
-      factoryIdentifier)
-
-    val context = new DefaultParserContext(catalogManager, plannerContext)
-    parserFactory.create(context)
+  def getDialectFactory: DialectFactory = {
+    if (dialectFactory == null || getTableConfig.getSqlDialect != currentDialect) {
+      val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase
+      dialectFactory = FactoryUtil.discoverFactory(
+        getClass.getClassLoader,
+        classOf[DialectFactory],
+        factoryIdentifier)
+      currentDialect = getTableConfig.getSqlDialect
+      parser = null
+      extendedOperationExecutor = null
+    }
+    dialectFactory
   }
 
   override def getParser: Parser = {
     if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
-      parser = createNewParser
-      currentDialect = getTableConfig.getSqlDialect
+      dialectFactory = getDialectFactory
+      parser =
+        dialectFactory.create(new DefaultParserContext(catalogManager, plannerContext, executor))
     }
     parser
   }
 
+  override def getExtendedOperationExecutor: ExtendedOperationExecutor = {
+    if (extendedOperationExecutor == null || getTableConfig.getSqlDialect != currentDialect) {
+      dialectFactory = getDialectFactory
+      extendedOperationExecutor = dialectFactory.createExtendedOperationExecutor(
+        new DefaultParserContext(catalogManager, plannerContext, executor))
+    }
+    extendedOperationExecutor
+  }
+
   override def translate(
       modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
     beforeTranslation()