You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2022/07/26 08:45:30 UTC
[flink] branch master updated: [FLINK-28628][table] Introduce Operation Execution Plugin (#20247)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c7af66be7cb [FLINK-28628][table] Introduce Operation Execution Plugin (#20247)
c7af66be7cb is described below
commit c7af66be7cb5c8148dd8bd258267bb7d2550f10e
Author: yuxia Luo <lu...@alumni.sjtu.edu.cn>
AuthorDate: Tue Jul 26 16:45:20 2022 +0800
[FLINK-28628][table] Introduce Operation Execution Plugin (#20247)
---
...eParserFactory.java => HiveDialectFactory.java} | 10 ++++--
.../delegation/hive/HiveOperationExecutor.java | 36 +++++++++++++++++++
.../org.apache.flink.table.factories.Factory | 2 +-
.../flink/connectors/hive/HiveDialectITCase.java | 19 ++++++++--
.../table/api/internal/TableEnvironmentImpl.java | 16 ++++++++-
.../delegation/ExtendedOperationExecutor.java | 42 ++++++++++++++++++++++
.../org/apache/flink/table/delegation/Planner.java | 8 +++++
.../org/apache/flink/table/utils/PlannerMock.java | 7 ++++
...rserFactory.java => DefaultDialectFactory.java} | 2 +-
.../{ParserFactory.java => DialectFactory.java} | 39 ++++++++++++++++++--
.../org.apache.flink.table.factories.Factory | 2 +-
.../table/planner/delegation/PlannerBase.scala | 41 ++++++++++++++-------
12 files changed, 200 insertions(+), 24 deletions(-)
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
similarity index 83%
rename from flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
rename to flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
index 92dce590d5b..10b8dce8f9a 100644
--- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveParserFactory.java
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveDialectFactory.java
@@ -20,14 +20,15 @@ package org.apache.flink.table.planner.delegation.hive;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.api.SqlDialect;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.planner.delegation.ParserFactory;
+import org.apache.flink.table.planner.delegation.DialectFactory;
import java.util.Collections;
import java.util.Set;
/** A Parser factory that creates {@link HiveParser}. */
-public class HiveParserFactory implements ParserFactory {
+public class HiveDialectFactory implements DialectFactory {
@Override
public String factoryIdentifier() {
@@ -52,4 +53,9 @@ public class HiveParserFactory implements ParserFactory {
context.getPlannerContext()::createCalciteParser,
context.getPlannerContext());
}
+
+ @Override
+ public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
+ return new HiveOperationExecutor();
+ }
}
diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
new file mode 100644
index 00000000000..2eb02ba27a6
--- /dev/null
+++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/planner/delegation/hive/HiveOperationExecutor.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.delegation.hive;
+
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
+
+/**
+ * A Hive's operation executor used to execute operation in custom way instead of Flink's
+ * implementation.
+ */
+public class HiveOperationExecutor implements ExtendedOperationExecutor {
+ @Override
+ public Optional<TableResultInternal> executeOperation(Operation operation) {
+ return Optional.empty();
+ }
+}
diff --git a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index 929abe11d6e..dac2f67e247 100644
--- a/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-connectors/flink-connector-hive/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -16,4 +16,4 @@
org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory
org.apache.flink.table.endpoint.hive.HiveServer2EndpointFactory
org.apache.flink.table.module.hive.HiveModuleFactory
-org.apache.flink.table.planner.delegation.hive.HiveParserFactory
+org.apache.flink.table.planner.delegation.hive.HiveDialectFactory
diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
index a1e868ab111..a861cde8114 100644
--- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
+++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveDialectITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
@@ -34,6 +35,7 @@ import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.command.ClearOperation;
@@ -41,6 +43,7 @@ import org.apache.flink.table.operations.command.HelpOperation;
import org.apache.flink.table.operations.command.QuitOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.planner.delegation.hive.HiveOperationExecutor;
import org.apache.flink.table.planner.delegation.hive.HiveParser;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.flink.types.Row;
@@ -114,18 +117,30 @@ public class HiveDialectITCase {
}
@Test
- public void testPluggableParser() {
+ public void testPluggableDialect() {
TableEnvironmentInternal tableEnvInternal = (TableEnvironmentInternal) tableEnv;
Parser parser = tableEnvInternal.getParser();
// hive dialect should use HiveParser
assertThat(parser).isInstanceOf(HiveParser.class);
- // execute some sql and verify the parser instance is reused
+ ExtendedOperationExecutor operationExecutor =
+ ((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor();
+ // hive dialect should use HiveOperationExecutor
+ assertThat(operationExecutor).isInstanceOf(HiveOperationExecutor.class);
+ // execute some sql and verify the parser/operation executor instance is reused
tableEnvInternal.executeSql("show databases");
assertThat(tableEnvInternal.getParser()).isSameAs(parser);
+ assertThat(((TableEnvironmentImpl) tableEnvInternal).getExtendedOperationExecutor())
+ .isSameAs(operationExecutor);
// switching dialect will result in a new parser
tableEnvInternal.getConfig().setSqlDialect(SqlDialect.DEFAULT);
assertThat(tableEnvInternal.getParser().getClass().getName())
.isNotEqualTo(parser.getClass().getName());
+ assertThat(
+ ((TableEnvironmentImpl) tableEnvInternal)
+ .getExtendedOperationExecutor()
+ .getClass()
+ .getName())
+ .isNotEqualTo(operationExecutor.getClass().getName());
}
@Test
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 68b97a0d279..d4e10108d38 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -73,6 +73,7 @@ import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
@@ -703,7 +704,8 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
throw new TableException(UNSUPPORTED_QUERY_IN_EXECUTE_SQL_MSG);
}
- return executeInternal(operations.get(0));
+ Operation operation = operations.get(0);
+ return executeInternal(operation);
}
@Override
@@ -872,6 +874,14 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
@Override
public TableResultInternal executeInternal(Operation operation) {
+ // try to use extended operation executor to execute the operation
+ Optional<TableResultInternal> tableResult =
+ getExtendedOperationExecutor().executeOperation(operation);
+ // if the extended operation executor return non-empty result, return it
+ if (tableResult.isPresent()) {
+ return tableResult.get();
+ }
+ // otherwise, fall back to internal implementation
if (operation instanceof ModifyOperation) {
return executeInternal(Collections.singletonList((ModifyOperation) operation));
} else if (operation instanceof StatementSetOperation) {
@@ -1639,6 +1649,10 @@ public class TableEnvironmentImpl implements TableEnvironmentInternal {
return getPlanner().getParser();
}
+ public ExtendedOperationExecutor getExtendedOperationExecutor() {
+ return getPlanner().getExtendedOperationExecutor();
+ }
+
@Override
public CatalogManager getCatalogManager() {
return catalogManager;
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java
new file mode 100644
index 00000000000..a3a0b2189c3
--- /dev/null
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/ExtendedOperationExecutor.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.delegation;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.api.internal.TableResultInternal;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
+
+/**
+ * An extended operation executor which provides method for executing operation. External pluggable
+ * dialect can implement this interface to execute operation in its own way instead of using Flink's
+ * own implementation for operation execution.
+ */
+@Internal
+public interface ExtendedOperationExecutor {
+
+ /**
+ * Execute the given operation and return the execution result. This method will delegate
+ * Flink's own operation execution.
+ *
+ * <p>If return Optional.empty(), the operation will then fall to Flink's operation execution.
+ */
+ Optional<TableResultInternal> executeOperation(Operation operation);
+}
diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
index b9e33df8bf7..ee969a19cc3 100644
--- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
+++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/delegation/Planner.java
@@ -62,6 +62,14 @@ public interface Planner {
*/
Parser getParser();
+ /**
+ * Retrieves a {@link ExtendedOperationExecutor} that provides method for executing operation in
+ * a custom way.
+ *
+ * @return initialized {@link ExtendedOperationExecutor}
+ */
+ ExtendedOperationExecutor getExtendedOperationExecutor();
+
/**
* Converts a relational tree of {@link ModifyOperation}s into a set of runnable {@link
* Transformation}s.
diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
index a8f45523458..c45a184eb59 100644
--- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
+++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/utils/PlannerMock.java
@@ -21,6 +21,7 @@ package org.apache.flink.table.utils;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.ExplainDetail;
import org.apache.flink.table.api.PlanReference;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.InternalPlan;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
@@ -29,6 +30,7 @@ import org.apache.flink.table.operations.Operation;
import java.io.IOException;
import java.util.List;
+import java.util.Optional;
/** Mocking {@link Planner} for tests. */
public class PlannerMock implements Planner {
@@ -38,6 +40,11 @@ public class PlannerMock implements Planner {
return new ParserMock();
}
+ @Override
+ public ExtendedOperationExecutor getExtendedOperationExecutor() {
+ return (operation) -> Optional.empty();
+ }
+
@Override
public List<Transformation<?>> translate(List<ModifyOperation> modifyOperations) {
return null;
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
similarity index 96%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
index e75efbe3432..512a5f428e8 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultParserFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DefaultDialectFactory.java
@@ -26,7 +26,7 @@ import java.util.Collections;
import java.util.Set;
/** A Parser factory that creates {@link ParserImpl}. */
-public class DefaultParserFactory implements ParserFactory {
+public class DefaultDialectFactory implements DialectFactory {
@Override
public String factoryIdentifier() {
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
similarity index 59%
rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java
rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
index 9c1ef0cd0e1..6deebab3dce 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/ParserFactory.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/delegation/DialectFactory.java
@@ -20,37 +20,52 @@ package org.apache.flink.table.planner.delegation;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.config.TableConfigOptions;
+import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExtendedOperationExecutor;
import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.factories.Factory;
+import org.apache.flink.table.operations.Operation;
+
+import java.util.Optional;
/**
- * Factory that creates {@link Parser}.
+ * Factory that creates {@link Parser} and {@link ExtendedOperationExecutor}.
*
* <p>The {@link #factoryIdentifier()} is identified by matching it against {@link
* TableConfigOptions#TABLE_SQL_DIALECT}.
*/
@Internal
-public interface ParserFactory extends Factory {
+public interface DialectFactory extends Factory {
/** Creates a new parser. */
Parser create(Context context);
+ default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
+ return new EmptyOperationExecutor();
+ }
+
/** Context provided when a parser is created. */
interface Context {
CatalogManager getCatalogManager();
PlannerContext getPlannerContext();
+
+ Executor getExecutor();
}
/** Default implementation for {@link Context}. */
class DefaultParserContext implements Context {
private final CatalogManager catalogManager;
private final PlannerContext plannerContext;
+ private final Executor executor;
- public DefaultParserContext(CatalogManager catalogManager, PlannerContext plannerContext) {
+ public DefaultParserContext(
+ CatalogManager catalogManager, PlannerContext plannerContext, Executor executor) {
this.catalogManager = catalogManager;
this.plannerContext = plannerContext;
+ this.executor = executor;
}
@Override
@@ -62,5 +77,23 @@ public interface ParserFactory extends Factory {
public PlannerContext getPlannerContext() {
return plannerContext;
}
+
+ @Override
+ public Executor getExecutor() {
+ return executor;
+ }
+ }
+
+ /**
+ * Default implementation for {@link ExtendedOperationExecutor} that doesn't extend any
+ * operation behavior but forward all operations to the Flink planner.
+ */
+ class EmptyOperationExecutor implements ExtendedOperationExecutor {
+
+ @Override
+ public Optional<TableResultInternal> executeOperation(Operation operation) {
+ // return empty so that it'll use Flink's own implementation for operation execution.
+ return Optional.empty();
+ }
}
}
diff --git a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
index ac5f9b6bb17..6121bb963fc 100644
--- a/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
+++ b/flink-table/flink-table-planner/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@@ -14,5 +14,5 @@
# limitations under the License.
org.apache.flink.table.planner.delegation.DefaultExecutorFactory
-org.apache.flink.table.planner.delegation.DefaultParserFactory
+org.apache.flink.table.planner.delegation.DefaultDialectFactory
org.apache.flink.table.planner.delegation.DefaultPlannerFactory
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index 1c8c6b7b2ff..8d00017f46e 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.catalog._
import org.apache.flink.table.catalog.ManagedTableListener.isManagedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
-import org.apache.flink.table.delegation.{Executor, Parser, Planner}
+import org.apache.flink.table.delegation.{Executor, ExtendedOperationExecutor, Parser, Planner}
import org.apache.flink.table.factories.{DynamicTableSinkFactory, FactoryUtil, TableFactoryUtil}
import org.apache.flink.table.module.{Module, ModuleManager}
import org.apache.flink.table.operations._
@@ -36,7 +36,6 @@ import org.apache.flink.table.planner.calcite._
import org.apache.flink.table.planner.catalog.CatalogManagerCalciteSchema
import org.apache.flink.table.planner.connectors.DynamicSinkUtils
import org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast
-import org.apache.flink.table.planner.delegation.ParserFactory.DefaultParserContext
import org.apache.flink.table.planner.expressions.PlannerTypeInferenceUtilImpl
import org.apache.flink.table.planner.hint.FlinkHints
import org.apache.flink.table.planner.operations.PlannerQueryOperation
@@ -56,6 +55,7 @@ import org.apache.flink.table.sinks.TableSink
import org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter
import _root_.scala.collection.JavaConversions._
+import DialectFactory.DefaultParserContext
import org.apache.calcite.jdbc.CalciteSchemaBuilder.asRootSchema
import org.apache.calcite.plan.{RelTrait, RelTraitDef}
import org.apache.calcite.rel.RelNode
@@ -100,7 +100,9 @@ abstract class PlannerBase(
// temporary utility until we don't use planner expressions anymore
functionCatalog.setPlannerTypeInferenceUtil(PlannerTypeInferenceUtilImpl.INSTANCE)
+ private var dialectFactory: DialectFactory = _
private var parser: Parser = _
+ private var extendedOperationExecutor: ExtendedOperationExecutor = _
private var currentDialect: SqlDialect = getTableConfig.getSqlDialect
@VisibleForTesting
@@ -147,25 +149,38 @@ abstract class PlannerBase(
executor.asInstanceOf[DefaultExecutor].getExecutionEnvironment
}
- def createNewParser: Parser = {
- val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase
- val parserFactory = FactoryUtil.discoverFactory(
- getClass.getClassLoader,
- classOf[ParserFactory],
- factoryIdentifier)
-
- val context = new DefaultParserContext(catalogManager, plannerContext)
- parserFactory.create(context)
+ def getDialectFactory: DialectFactory = {
+ if (dialectFactory == null || getTableConfig.getSqlDialect != currentDialect) {
+ val factoryIdentifier = getTableConfig.getSqlDialect.name().toLowerCase
+ dialectFactory = FactoryUtil.discoverFactory(
+ getClass.getClassLoader,
+ classOf[DialectFactory],
+ factoryIdentifier)
+ currentDialect = getTableConfig.getSqlDialect
+ parser = null
+ extendedOperationExecutor = null
+ }
+ dialectFactory
}
override def getParser: Parser = {
if (parser == null || getTableConfig.getSqlDialect != currentDialect) {
- parser = createNewParser
- currentDialect = getTableConfig.getSqlDialect
+ dialectFactory = getDialectFactory
+ parser =
+ dialectFactory.create(new DefaultParserContext(catalogManager, plannerContext, executor))
}
parser
}
+ override def getExtendedOperationExecutor: ExtendedOperationExecutor = {
+ if (extendedOperationExecutor == null || getTableConfig.getSqlDialect != currentDialect) {
+ dialectFactory = getDialectFactory
+ extendedOperationExecutor = dialectFactory.createExtendedOperationExecutor(
+ new DefaultParserContext(catalogManager, plannerContext, executor))
+ }
+ extendedOperationExecutor
+ }
+
override def translate(
modifyOperations: util.List[ModifyOperation]): util.List[Transformation[_]] = {
beforeTranslation()