You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/12/14 09:40:24 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 53ff56b [KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization
53ff56b is described below
commit 53ff56be29884b97898bd1363df3704d42d9fa9e
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Dec 14 17:40:11 2021 +0800
[KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization
…
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #1529 from yanghua/KYUUBI-1525.
Closes #1525
e166965d [yanghua] Address review suggestion
9929aa29 [yanghua] Remove some comment
eafb200b [yanghua] refactor code
58df34c7 [yanghua] refactor code
675e14e7 [yanghua] pruning unnecessary code
15e97283 [yanghua] pruning unnecessary code
c6d4f549 [yanghua] pruning unnecessary code
b099a455 [yanghua] pruning unnecessary code
316086bb [yanghua] pruning unnecessary code
fe7af8ef [yanghua] pruning unnecessary code
ecf3ccf4 [yanghua] pruning unnecessary code
4c174517 [yanghua] pruning unnecessary code
b9226c3e [yanghua] pruning unnecessary code
fb2f6a62 [yanghua] pruning unnecessary code
21e21a50 [yanghua] pruning unnecessary code
a58076ff [yanghua] pruning unnecessary code
acd24fc5 [yanghua] pruning unnecessary code
333d4275 [yanghua] pruning unnecessary code
7416b3de [yanghua] pruning unnecessary code
84d0db91 [yanghua] pruning unnecessary code and refactor code
a55db41a [yanghua] pruning unnecessary code
b9909308 [yanghua] [KYUUBI #1525] Implement GetCatalogs operation and do the further initialization
Authored-by: yanghua <ya...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
externals/kyuubi-flink-sql-engine/pom.xml | 88 +++++
.../engine/flink/config/EngineEnvironment.java | 81 +++++
.../engine/flink/config/entries/ConfigEntry.java | 68 ++++
.../flink/config/entries/ExecutionEntry.java | 368 +++++++++++++++++++++
.../kyuubi/engine/flink/context/EngineContext.java | 149 +++++++++
.../engine/flink/context/ExecutionContext.java | 306 +++++++++++++++++
.../engine/flink/context/SessionContext.java | 69 ++++
.../kyuubi/engine/flink/result/ColumnInfo.java | 81 +++++
.../kyuubi/engine/flink/result/Constants.java | 25 ++
.../kyuubi/engine/flink/result/OperationUtil.java | 49 +++
.../kyuubi/engine/flink/result/ResultKind.java | 25 ++
.../kyuubi/engine/flink/result/ResultSet.java | 165 +++++++++
.../engine/flink/operation/FlinkOperation.scala | 135 ++++++++
.../flink/operation/FlinkSQLOperationManager.scala | 67 ++++
.../engine/flink/operation/GetCatalogs.scala | 40 +++
.../apache/kyuubi/engine/flink/schema/RowSet.scala | 236 +++++++++++++
.../flink/session/FlinkSQLSessionManager.scala | 41 +++
.../engine/flink/session/FlinkSessionImpl.scala | 39 +++
.../flink/operation/FlinkOperationSuite.scala | 84 +++++
pom.xml | 74 +++++
20 files changed, 2190 insertions(+)
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index b2c49a4..a652b9d 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -45,6 +45,94 @@
<version>${project.version}</version>
</dependency>
+ <!-- flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-parser</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- tests -->
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.kyuubi</groupId>
+ <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java
new file mode 100644
index 0000000..20bc0c0
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kyuubi.engine.flink.config.entries.ExecutionEntry;
+
+/**
+ * EngineEnvironment configuration that represents the content of an environment file.
+ * EngineEnvironment files define engine, session, catalogs, tables, execution, and deployment
+ * behavior. An environment might be defined by default or as part of a session. Environments can be
+ * merged or enriched with properties.
+ */
+public class EngineEnvironment {
+
+ public static final String EXECUTION_ENTRY = "execution";
+
+ private ExecutionEntry execution;
+
+ public EngineEnvironment() {
+ this.execution = ExecutionEntry.DEFAULT_INSTANCE;
+ }
+
+ public ExecutionEntry getExecution() {
+ return execution;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder();
+ sb.append("=================== Execution ====================\n");
+ execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+ return sb.toString();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Merges two environments. The properties of the first environment might be overwritten by the
+ * second one.
+ */
+ public static EngineEnvironment merge(EngineEnvironment env1, EngineEnvironment env2) {
+ final EngineEnvironment mergedEnv = new EngineEnvironment();
+
+ // merge execution properties
+ mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution());
+
+ return mergedEnv;
+ }
+
+ public EngineEnvironment clone() {
+ return enrich(this, Collections.emptyMap());
+ }
+
+ /** Enriches an environment with new/modified properties or views and returns the new instance. */
+ public static EngineEnvironment enrich(EngineEnvironment env, Map<String, String> properties) {
+ final EngineEnvironment enrichedEnv = new EngineEnvironment();
+
+ // enrich execution properties
+ enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties);
+
+ return enrichedEnv;
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java
new file mode 100644
index 0000000..59540c4
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config.entries;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+/**
+ * Describes an environment configuration entry (such as catalogs, table, functions, views). Config
+ * entries are similar to {@link org.apache.flink.table.descriptors.Descriptor} but apply to SQL
+ * Engine's environment files only.
+ */
+abstract class ConfigEntry {
+
+ protected final DescriptorProperties properties;
+
+ protected ConfigEntry(DescriptorProperties properties) {
+ try {
+ validate(properties);
+ } catch (ValidationException e) {
+ throw new RuntimeException("Invalid configuration entry.", e);
+ }
+
+ this.properties = properties;
+ }
+
+ /** Performs syntactic validation. */
+ protected abstract void validate(DescriptorProperties properties);
+
+ public Map<String, String> asMap() {
+ return properties.asMap();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConfigEntry that = (ConfigEntry) o;
+ return Objects.equals(properties, that.properties);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(properties);
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java
new file mode 100644
index 0000000..279b72d
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config.entries;
+
+import static org.apache.kyuubi.engine.flink.config.EngineEnvironment.EXECUTION_ENTRY;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration of a table program execution. This class parses the `execution` part in an
+ * environment file. The execution describes properties that would usually be defined in the
+ * ExecutionEnvironment/StreamExecutionEnvironment/TableEnvironment or as code in a Flink job.
+ *
+ * <p>All properties of this entry are optional and evaluated lazily. TODO remove the useless
+ * properties
+ */
+public class ExecutionEntry extends ConfigEntry {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionEntry.class);
+
+ public static final ExecutionEntry DEFAULT_INSTANCE =
+ new ExecutionEntry(new DescriptorProperties(true));
+
+ public static final String EXECUTION_PLANNER = "planner";
+
+ public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";
+
+ public static final String EXECUTION_TYPE = "type";
+
+ public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+ public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+ private static final String EXECUTION_TIME_CHARACTERISTIC = "time-characteristic";
+
+ private static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME = "event-time";
+
+ private static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME =
+ "processing-time";
+
+ private static final String EXECUTION_PERIODIC_WATERMARKS_INTERVAL =
+ "periodic-watermarks-interval";
+
+ private static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention";
+
+ private static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention";
+
+ private static final String EXECUTION_PARALLELISM = "parallelism";
+
+ private static final String EXECUTION_MAX_PARALLELISM = "max-parallelism";
+
+ public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+ public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog";
+
+ public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+ public static final String EXECUTION_MAX_BUFFER_SIZE = "max_buffer_size";
+
+ private static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";
+
+ private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
+
+ private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none";
+
+ private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";
+
+ private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";
+
+ private static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts";
+
+ private static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay";
+
+ private static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL =
+ "restart-strategy.failure-rate-interval";
+
+ private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL =
+ "restart-strategy.max-failures-per-interval";
+
+ public static final String EXECUTION_CURRENT_CATALOG = "current-catalog";
+
+ public static final String EXECUTION_CURRENT_DATABASE = "current-database";
+
+ private ExecutionEntry(DescriptorProperties properties) {
+ super(properties);
+ }
+
+ @Override
+ protected void validate(DescriptorProperties properties) {
+ properties.validateEnumValues(
+ EXECUTION_PLANNER, true, Arrays.asList(EXECUTION_PLANNER_VALUE_BLINK));
+ properties.validateEnumValues(
+ EXECUTION_TYPE,
+ true,
+ Arrays.asList(EXECUTION_TYPE_VALUE_BATCH, EXECUTION_TYPE_VALUE_STREAMING));
+ properties.validateEnumValues(
+ EXECUTION_TIME_CHARACTERISTIC,
+ true,
+ Arrays.asList(
+ EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME,
+ EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME));
+ properties.validateLong(EXECUTION_PERIODIC_WATERMARKS_INTERVAL, true, 1);
+ properties.validateLong(EXECUTION_MIN_STATE_RETENTION, true, 0);
+ properties.validateLong(EXECUTION_MAX_STATE_RETENTION, true, 0);
+ properties.validateInt(EXECUTION_PARALLELISM, true, 1);
+ properties.validateInt(EXECUTION_MAX_PARALLELISM, true, 1);
+ properties.validateEnumValues(
+ EXECUTION_RESTART_STRATEGY_TYPE,
+ true,
+ Arrays.asList(
+ EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK,
+ EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE,
+ EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY,
+ EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE));
+ properties.validateInt(EXECUTION_RESTART_STRATEGY_ATTEMPTS, true, 1);
+ properties.validateLong(EXECUTION_RESTART_STRATEGY_DELAY, true, 0);
+ properties.validateLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, true, 1);
+ properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, true, 1);
+ properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
+ properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
+ properties.validateInt(EXECUTION_MAX_BUFFER_SIZE, true, 1);
+ }
+
+ public EnvironmentSettings getEnvironmentSettings() {
+ final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
+
+ if (inStreamingMode()) {
+ builder.inStreamingMode();
+ } else if (inBatchMode()) {
+ builder.inBatchMode();
+ }
+
+ final String planner =
+ properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+ if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+ builder.useBlinkPlanner();
+ }
+
+ return builder.build();
+ }
+
+ public boolean inStreamingMode() {
+ return properties
+ .getOptionalString(EXECUTION_TYPE)
+ .map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
+ .orElse(false);
+ }
+
+ public boolean inBatchMode() {
+ return properties
+ .getOptionalString(EXECUTION_TYPE)
+ .map((v) -> v.equals(EXECUTION_TYPE_VALUE_BATCH))
+ .orElse(false);
+ }
+
+ public boolean isStreamingPlanner() {
+ final String planner =
+ properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+ // Blink planner is a streaming planner
+ if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+ return true;
+ } else {
+ return inStreamingMode();
+ }
+ }
+
+ public boolean isBatchPlanner() {
+ final String planner =
+ properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+ // Blink planner is not a batch planner
+ if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+ return false;
+ } else {
+ return inBatchMode();
+ }
+ }
+
+ public TimeCharacteristic getTimeCharacteristic() {
+ return properties
+ .getOptionalString(EXECUTION_TIME_CHARACTERISTIC)
+ .flatMap(
+ (v) -> {
+ switch (v) {
+ case EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
+ return Optional.of(TimeCharacteristic.EventTime);
+ case EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
+ return Optional.of(TimeCharacteristic.ProcessingTime);
+ default:
+ return Optional.empty();
+ }
+ })
+ .orElseGet(
+ () ->
+ useDefaultValue(
+ EXECUTION_TIME_CHARACTERISTIC,
+ TimeCharacteristic.EventTime,
+ EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME));
+ }
+
+ public long getPeriodicWatermarksInterval() {
+ return properties
+ .getOptionalLong(EXECUTION_PERIODIC_WATERMARKS_INTERVAL)
+ .orElseGet(() -> useDefaultValue(EXECUTION_PERIODIC_WATERMARKS_INTERVAL, 200L));
+ }
+
+ public int getParallelism() {
+ return properties
+ .getOptionalInt(EXECUTION_PARALLELISM)
+ .orElseGet(() -> useDefaultValue(EXECUTION_PARALLELISM, 1));
+ }
+
+ public int getMaxParallelism() {
+ return properties
+ .getOptionalInt(EXECUTION_MAX_PARALLELISM)
+ .orElseGet(() -> useDefaultValue(EXECUTION_MAX_PARALLELISM, 128));
+ }
+
+ public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+ return properties
+ .getOptionalString(EXECUTION_RESTART_STRATEGY_TYPE)
+ .flatMap(
+ (v) -> {
+ switch (v) {
+ case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
+ return Optional.of(RestartStrategies.noRestart());
+ case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
+ final int attempts =
+ properties
+ .getOptionalInt(EXECUTION_RESTART_STRATEGY_ATTEMPTS)
+ .orElseGet(
+ () ->
+ useDefaultValue(
+ EXECUTION_RESTART_STRATEGY_ATTEMPTS, Integer.MAX_VALUE));
+ final long fixedDelay =
+ properties
+ .getOptionalLong(EXECUTION_RESTART_STRATEGY_DELAY)
+ .orElseGet(
+ () -> useDefaultValue(EXECUTION_RESTART_STRATEGY_DELAY, 10_000L));
+ return Optional.of(RestartStrategies.fixedDelayRestart(attempts, fixedDelay));
+ case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
+ final int failureRate =
+ properties
+ .getOptionalInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL)
+ .orElseGet(
+ () ->
+ useDefaultValue(
+ EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, 1));
+ final long failureInterval =
+ properties
+ .getOptionalLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)
+ .orElseGet(
+ () ->
+ useDefaultValue(
+ EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, 60_000L));
+ final long attemptDelay =
+ properties
+ .getOptionalLong(EXECUTION_RESTART_STRATEGY_DELAY)
+ .orElseGet(
+ () -> useDefaultValue(EXECUTION_RESTART_STRATEGY_DELAY, 10_000L));
+ return Optional.of(
+ RestartStrategies.failureRateRestart(
+ failureRate,
+ Time.milliseconds(failureInterval),
+ Time.milliseconds(attemptDelay)));
+ default:
+ return Optional.empty();
+ }
+ })
+ .orElseGet(
+ () ->
+ useDefaultValue(
+ EXECUTION_RESTART_STRATEGY_TYPE,
+ RestartStrategies.fallBackRestart(),
+ EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK));
+ }
+
+ public Optional<String> getCurrentCatalog() {
+ return properties.getOptionalString(EXECUTION_CURRENT_CATALOG);
+ }
+
+ public Optional<String> getCurrentDatabase() {
+ return properties.getOptionalString(EXECUTION_CURRENT_DATABASE);
+ }
+
+ public Map<String, String> asTopLevelMap() {
+ return properties.asPrefixedMap(EXECUTION_ENTRY + '.');
+ }
+
+ private <V> V useDefaultValue(String key, V defaultValue) {
+ return useDefaultValue(key, defaultValue, defaultValue.toString());
+ }
+
+ private <V> V useDefaultValue(String key, V defaultValue, String defaultString) {
+ LOG.info(
+ "Property '{}.{}' not specified. Using default value: {}",
+ EXECUTION_ENTRY,
+ key,
+ defaultString);
+ return defaultValue;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Merges two execution entries. The properties of the first execution entry might be overwritten
+ * by the second one.
+ */
+ public static ExecutionEntry merge(ExecutionEntry execution1, ExecutionEntry execution2) {
+ final Map<String, String> mergedProperties = new HashMap<>(execution1.asMap());
+ mergedProperties.putAll(execution2.asMap());
+
+ final DescriptorProperties properties = new DescriptorProperties(true);
+ properties.putProperties(mergedProperties);
+
+ return new ExecutionEntry(properties);
+ }
+
+ /**
+ * Creates a new execution entry enriched with additional properties that are prefixed with
+ * EngineEnvironment#EXECUTION_ENTRY.
+ */
+ public static ExecutionEntry enrich(
+ ExecutionEntry execution, Map<String, String> prefixedProperties) {
+ final Map<String, String> enrichedProperties = new HashMap<>(execution.asMap());
+
+ prefixedProperties.forEach(
+ (k, v) -> {
+ final String normalizedKey = k.toLowerCase();
+ if (k.startsWith(EXECUTION_ENTRY + '.')) {
+ enrichedProperties.put(normalizedKey.substring(EXECUTION_ENTRY.length() + 1), v);
+ }
+ });
+
+ final DescriptorProperties properties = new DescriptorProperties(true);
+ properties.putProperties(enrichedProperties);
+
+ return new ExecutionEntry(properties);
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java
new file mode 100644
index 0000000..3b46057
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.cli.Options;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+
+/** Context describing default environment, dependencies, flink config, etc. */
+public class EngineContext {
+ private final EngineEnvironment engineEnv;
+ private final List<URL> dependencies;
+ private final Configuration flinkConfig;
+ private final List<CustomCommandLine> commandLines;
+ private final Options commandLineOptions;
+ private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+ public EngineContext(EngineEnvironment engineEnv, List<URL> dependencies) {
+ this.engineEnv = engineEnv;
+ this.dependencies = dependencies;
+
+ // discover configuration
+ final String flinkConfigDir;
+ try {
+ // find the configuration directory
+ flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
+ // load the global configuration
+ this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);
+
+ // initialize default file system
+ FileSystem.initialize(
+ flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+ // load command lines for deployment
+ this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
+ this.commandLineOptions = collectCommandLineOptions(commandLines);
+ } catch (Exception e) {
+ throw new RuntimeException("Could not load Flink configuration.", e);
+ }
+ clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
+ }
+
+ /** Constructor for testing purposes. */
+ @VisibleForTesting
+ public EngineContext(
+ EngineEnvironment engineEnv,
+ List<URL> dependencies,
+ Configuration flinkConfig,
+ CustomCommandLine commandLine,
+ ClusterClientServiceLoader clusterClientServiceLoader) {
+ this.engineEnv = engineEnv;
+ this.dependencies = dependencies;
+ this.flinkConfig = flinkConfig;
+ this.commandLines = Collections.singletonList(commandLine);
+ this.commandLineOptions = collectCommandLineOptions(commandLines);
+ this.clusterClientServiceLoader = Objects.requireNonNull(clusterClientServiceLoader);
+ }
+
+ public Configuration getFlinkConfig() {
+ return flinkConfig;
+ }
+
+ public EngineEnvironment getEngineEnv() {
+ return engineEnv;
+ }
+
+ public List<URL> getDependencies() {
+ return dependencies;
+ }
+
+ public List<CustomCommandLine> getCommandLines() {
+ return commandLines;
+ }
+
+ public Options getCommandLineOptions() {
+ return commandLineOptions;
+ }
+
+ public ClusterClientServiceLoader getClusterClientServiceLoader() {
+ return clusterClientServiceLoader;
+ }
+
+ private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+ final Options customOptions = new Options();
+ for (CustomCommandLine customCommandLine : commandLines) {
+ customCommandLine.addGeneralOptions(customOptions);
+ customCommandLine.addRunOptions(customOptions);
+ }
+ return CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), customOptions);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof EngineContext)) {
+ return false;
+ }
+ EngineContext context = (EngineContext) o;
+ return Objects.equals(engineEnv, context.engineEnv)
+ && Objects.equals(dependencies, context.dependencies)
+ && Objects.equals(flinkConfig, context.flinkConfig)
+ && Objects.equals(commandLines, context.commandLines)
+ && Objects.equals(commandLineOptions, context.commandLineOptions)
+ && Objects.equals(clusterClientServiceLoader, context.clusterClientServiceLoader);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ engineEnv,
+ dependencies,
+ flinkConfig,
+ commandLines,
+ commandLineOptions,
+ clusterClientServiceLoader);
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java
new file mode 100644
index 0000000..7a9a6e6
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Context for executing table programs. This class caches everything that can be cached across
+ * multiple queries as long as the session context does not change. This must be thread-safe as it
+ * might be reused across different query submissions.
+ *
+ * @param <ClusterID> cluster id
+ */
+public class ExecutionContext<ClusterID> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
+
+ private final EngineEnvironment engineEnvironment;
+ private final ClassLoader classLoader;
+
+ private final Configuration flinkConfig;
+
+ private TableEnvironmentInternal tableEnv;
+ private StreamExecutionEnvironment streamExecEnv;
+ private Executor executor;
+
+ private ExecutionContext(
+ EngineEnvironment engineEnvironment,
+ List<URL> dependencies,
+ Configuration flinkConfig,
+ ClusterClientServiceLoader clusterClientServiceLoader,
+ Options commandLineOptions,
+ List<CustomCommandLine> availableCommandLines)
+ throws FlinkException {
+ this.engineEnvironment = engineEnvironment;
+
+ this.flinkConfig = flinkConfig;
+
+ // create class loader
+ classLoader =
+ ClientUtils.buildUserCodeClassLoader(
+ dependencies, Collections.emptyList(), this.getClass().getClassLoader(), flinkConfig);
+
+ // Initialize the TableEnvironment.
+ initializeTableEnvironment();
+ }
+
+ /**
+ * Executes the given supplier using the execution context's classloader as thread classloader.
+ */
+ public <R> R wrapClassLoader(Supplier<R> supplier) {
+ try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+ return supplier.get();
+ }
+ }
+
+ /**
+ * Executes the given Runnable using the execution context's classloader as thread classloader.
+ */
+ void wrapClassLoader(Runnable runnable) {
+ try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+ runnable.run();
+ }
+ }
+
+ public TableEnvironmentInternal getTableEnvironment() {
+ return tableEnv;
+ }
+
+ /** Returns a builder for this {@link ExecutionContext}. */
+ public static Builder builder(
+ EngineEnvironment defaultEnv,
+ EngineEnvironment sessionEnv,
+ List<URL> dependencies,
+ Configuration configuration,
+ ClusterClientServiceLoader serviceLoader,
+ Options commandLineOptions,
+ List<CustomCommandLine> commandLines) {
+ return new Builder(
+ defaultEnv,
+ sessionEnv,
+ dependencies,
+ configuration,
+ serviceLoader,
+ commandLineOptions,
+ commandLines);
+ }
+
+ // ------------------------------------------------------------------------------------------------------------------
+ // Non-public methods
+ // ------------------------------------------------------------------------------------------------------------------
+
+ private TableEnvironmentInternal createStreamTableEnvironment(
+ StreamExecutionEnvironment env,
+ EnvironmentSettings settings,
+ TableConfig config,
+ Executor executor,
+ CatalogManager catalogManager,
+ ModuleManager moduleManager,
+ FunctionCatalog functionCatalog) {
+ final Map<String, String> plannerProperties = settings.toPlannerProperties();
+ final Planner planner =
+ ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+ .create(plannerProperties, executor, config, functionCatalog, catalogManager);
+
+ return new StreamTableEnvironmentImpl(
+ catalogManager,
+ moduleManager,
+ functionCatalog,
+ config,
+ env,
+ planner,
+ executor,
+ settings.isStreamingMode(),
+ classLoader);
+ }
+
+ private static Executor lookupExecutor(
+ Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
+ try {
+ ExecutorFactory executorFactory =
+ ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+ Method createMethod =
+ executorFactory
+ .getClass()
+ .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+ return (Executor)
+ createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
+ } catch (Exception e) {
+ throw new TableException(
+ "Could not instantiate the executor. Make sure a planner module is on the classpath", e);
+ }
+ }
+
+ private void initializeTableEnvironment() {
+ final EnvironmentSettings settings = engineEnvironment.getExecution().getEnvironmentSettings();
+ final TableConfig config = new TableConfig();
+ final ModuleManager moduleManager = new ModuleManager();
+ final CatalogManager catalogManager =
+ CatalogManager.newBuilder()
+ .classLoader(classLoader)
+ .config(config.getConfiguration())
+ .defaultCatalog(
+ settings.getBuiltInCatalogName(),
+ new GenericInMemoryCatalog(
+ settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()))
+ .build();
+ final FunctionCatalog functionCatalog =
+ new FunctionCatalog(config, catalogManager, moduleManager);
+
+ // Must initialize the table engineEnvironment before actually the
+ createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);
+
+ // No need to register the catalogs if already inherit from the same session.
+ initializeCatalogs();
+ }
+
+ private void createTableEnvironment(
+ EnvironmentSettings settings,
+ TableConfig config,
+ CatalogManager catalogManager,
+ ModuleManager moduleManager,
+ FunctionCatalog functionCatalog) {
+ if (engineEnvironment.getExecution().isStreamingPlanner()) {
+ streamExecEnv = createStreamExecutionEnvironment();
+
+ final Map<String, String> executorProperties = settings.toExecutorProperties();
+ executor = lookupExecutor(executorProperties, streamExecEnv);
+ tableEnv =
+ createStreamTableEnvironment(
+ streamExecEnv,
+ settings,
+ config,
+ executor,
+ catalogManager,
+ moduleManager,
+ functionCatalog);
+ } else {
+ throw new RuntimeException("Unsupported execution type specified.");
+ }
+ }
+
+ private void initializeCatalogs() {
+ // Switch to the current catalog.
+ Optional<String> catalog = engineEnvironment.getExecution().getCurrentCatalog();
+ catalog.ifPresent(tableEnv::useCatalog);
+
+ // Switch to the current database.
+ Optional<String> database = engineEnvironment.getExecution().getCurrentDatabase();
+ database.ifPresent(tableEnv::useDatabase);
+ }
+
+ private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRestartStrategy(engineEnvironment.getExecution().getRestartStrategy());
+ env.setParallelism(engineEnvironment.getExecution().getParallelism());
+ env.setMaxParallelism(engineEnvironment.getExecution().getMaxParallelism());
+ env.getConfig()
+ .setAutoWatermarkInterval(engineEnvironment.getExecution().getPeriodicWatermarksInterval());
+ return env;
+ }
+
+ // ~ Inner Class -------------------------------------------------------------------------------
+
+ /** Builder for {@link ExecutionContext}. */
+ public static class Builder {
+ // Required members.
+ private final EngineEnvironment sessionEnv;
+ private final List<URL> dependencies;
+ private final Configuration configuration;
+ private final ClusterClientServiceLoader serviceLoader;
+ private final Options commandLineOptions;
+ private final List<CustomCommandLine> commandLines;
+
+ private EngineEnvironment defaultEnv;
+ private EngineEnvironment currentEnv;
+
+ private Builder(
+ EngineEnvironment defaultEnv,
+ @Nullable EngineEnvironment sessionEnv,
+ List<URL> dependencies,
+ Configuration configuration,
+ ClusterClientServiceLoader serviceLoader,
+ Options commandLineOptions,
+ List<CustomCommandLine> commandLines) {
+ this.defaultEnv = defaultEnv;
+ this.sessionEnv = sessionEnv;
+ this.dependencies = dependencies;
+ this.configuration = configuration;
+ this.serviceLoader = serviceLoader;
+ this.commandLineOptions = commandLineOptions;
+ this.commandLines = commandLines;
+ }
+
+ public Builder env(EngineEnvironment engineEnvironment) {
+ this.currentEnv = engineEnvironment;
+ return this;
+ }
+
+ public ExecutionContext<?> build() {
+ try {
+ return new ExecutionContext<>(
+ this.currentEnv == null
+ ? EngineEnvironment.merge(defaultEnv, sessionEnv)
+ : this.currentEnv,
+ this.dependencies,
+ this.configuration,
+ this.serviceLoader,
+ this.commandLineOptions,
+ this.commandLines);
+ } catch (Throwable t) {
+ // catch everything such that a configuration does not crash the executor
+ throw new RuntimeException("Could not create execution context.", t);
+ }
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java
new file mode 100644
index 0000000..b150942
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.util.Objects;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+
+/** Context describing current session properties, original properties, ExecutionContext, etc. */
+public class SessionContext {
+ private final EngineEnvironment engineEnv;
+ private final EngineContext engineContext;
+ private ExecutionContext<?> executionContext;
+
+ public SessionContext(EngineEnvironment engineEnv, EngineContext engineContext) {
+ this.engineEnv = engineEnv;
+ this.engineContext = engineContext;
+ this.executionContext = createExecutionContextBuilder(engineEnv).build();
+ }
+
+ public ExecutionContext<?> getExecutionContext() {
+ return executionContext;
+ }
+
+ /** Returns ExecutionContext.Builder with given {@link SessionContext} session context. */
+ public ExecutionContext.Builder createExecutionContextBuilder(EngineEnvironment sessionEnv) {
+ return ExecutionContext.builder(
+ engineContext.getEngineEnv(),
+ sessionEnv,
+ engineContext.getDependencies(),
+ engineContext.getFlinkConfig(),
+ engineContext.getClusterClientServiceLoader(),
+ engineContext.getCommandLineOptions(),
+ engineContext.getCommandLines());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SessionContext)) {
+ return false;
+ }
+ SessionContext context = (SessionContext) o;
+ return Objects.equals(engineEnv, context.engineEnv)
+ && Objects.equals(executionContext, context.executionContext);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(engineEnv, executionContext);
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java
new file mode 100644
index 0000000..98982a9
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.util.Preconditions;
+
+/** A column info represents a table column's structure with column name, column type. */
+public class ColumnInfo {
+
+ private String name;
+
+ private String type;
+
+ @Nullable private LogicalType logicalType;
+
+ public ColumnInfo(String name, String type) {
+ this.name = Preconditions.checkNotNull(name, "name must not be null");
+ this.type = Preconditions.checkNotNull(type, "type must not be null");
+ }
+
+ public static ColumnInfo create(String name, LogicalType type) {
+ return new ColumnInfo(name, type.toString());
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public LogicalType getLogicalType() {
+ if (logicalType == null) {
+ logicalType = LogicalTypeParser.parse(type);
+ }
+ return logicalType;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ColumnInfo that = (ColumnInfo) o;
+ return name.equals(that.name) && type.equals(that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type);
+ }
+
+ @Override
+ public String toString() {
+ return "ColumnInfo{" + "name='" + name + '\'' + ", type='" + type + '\'' + '}';
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
new file mode 100644
index 0000000..6496d84
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+/** Constant column names. */
+public class Constants {
+
+ public static final String SHOW_CATALOGS_RESULT = "catalogs";
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java
new file mode 100644
index 0000000..9224f9d
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+/** Utility class for flink operation. */
+public class OperationUtil {
+
+ public static ResultSet stringListToResultSet(List<String> strings, String columnName) {
+ List<Row> data = new ArrayList<>();
+ boolean isNullable = false;
+ int maxLength = VarCharType.DEFAULT_LENGTH;
+
+ for (String str : strings) {
+ if (str == null) {
+ isNullable = true;
+ } else {
+ maxLength = Math.max(str.length(), maxLength);
+ data.add(Row.of(str));
+ }
+ }
+
+ return ResultSet.builder()
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(ColumnInfo.create(columnName, new VarCharType(isNullable, maxLength)))
+ .data(data.toArray(new Row[0]))
+ .build();
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
new file mode 100644
index 0000000..8fb51c2
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+/** ResultKind defines the types of the result. */
+public enum ResultKind {
+ // rows with important content are available (DML, DQL)
+ SUCCESS_WITH_CONTENT
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
new file mode 100644
index 0000000..10fc35e
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import com.google.common.collect.Iterators;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.kyuubi.operation.ArrayFetchIterator;
+import org.apache.kyuubi.operation.FetchIterator;
+
+/**
+ * A set of one statement execution result containing result kind, column infos, rows of data and
+ * change flags for streaming mode.
+ */
+public class ResultSet {
+
+ private final ResultKind resultKind;
+ private final List<ColumnInfo> columns;
+ private final FetchIterator<Row> data;
+
+ // null in batch mode
+ //
+ // list of boolean in streaming mode,
+ // true if the corresponding row is an append row, false if its a retract row
+ private final List<Boolean> changeFlags;
+
+ private ResultSet(
+ ResultKind resultKind,
+ List<ColumnInfo> columns,
+ FetchIterator<Row> data,
+ @Nullable List<Boolean> changeFlags) {
+ this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
+ this.columns = Preconditions.checkNotNull(columns, "columns must not be null");
+ this.data = Preconditions.checkNotNull(data, "data must not be null");
+ if (!data.hasNext()) {
+ Preconditions.checkArgument(
+ columns.size() == data.next().getArity(),
+ "the size of columns and the number of fields in the row should be equal");
+ }
+ this.changeFlags = changeFlags;
+ if (changeFlags != null) {
+ Preconditions.checkArgument(
+ Iterators.size((Iterator<?>) data) == changeFlags.size(),
+ "the size of data and the size of changeFlags should be equal");
+ }
+ }
+
+ public List<ColumnInfo> getColumns() {
+ return columns;
+ }
+
+ public FetchIterator<Row> getData() {
+ return data;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ResultSet resultSet = (ResultSet) o;
+ return resultKind.equals(resultSet.resultKind)
+ && columns.equals(resultSet.columns)
+ && data.equals(resultSet.data)
+ && Objects.equals(changeFlags, resultSet.changeFlags);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resultKind, columns, data, changeFlags);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultSet{"
+ + "resultKind="
+ + resultKind
+ + ", columns="
+ + columns
+ + ", data="
+ + data
+ + ", changeFlags="
+ + changeFlags
+ + '}';
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Builder for {@link ResultSet}. */
+ public static class Builder {
+ private ResultKind resultKind = null;
+ private List<ColumnInfo> columns = null;
+ private FetchIterator<Row> data = null;
+ private List<Boolean> changeFlags = null;
+
+ private Builder() {}
+
+ /** Set {@link ResultKind}. */
+ public Builder resultKind(ResultKind resultKind) {
+ this.resultKind = resultKind;
+ return this;
+ }
+
+ /** Set {@link ColumnInfo}s. */
+ public Builder columns(ColumnInfo... columns) {
+ this.columns = Arrays.asList(columns);
+ return this;
+ }
+
+ /** Set {@link ColumnInfo}s. */
+ public Builder columns(List<ColumnInfo> columns) {
+ this.columns = columns;
+ return this;
+ }
+
+ /** Set data. */
+ public Builder data(FetchIterator<Row> data) {
+ this.data = data;
+ return this;
+ }
+
+ /** Set data. */
+ public Builder data(Row[] data) {
+ this.data = new ArrayFetchIterator<>(data);
+ return this;
+ }
+
+ /** Set change flags. */
+ public Builder changeFlags(List<Boolean> changeFlags) {
+ this.changeFlags = changeFlags;
+ return this;
+ }
+
+ /** Returns a {@link ResultSet} instance. */
+ public ResultSet build() {
+ return new ResultSet(resultKind, columns, data, changeFlags);
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
new file mode 100644
index 0000000..d0bfd7a
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.engine.flink.context.SessionContext
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.RowSet
+import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
+import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.OperationState.OperationState
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+abstract class FlinkOperation(
+ opType: OperationType,
+ session: Session)
+ extends AbstractOperation(opType, session) {
+
+ protected val sessionContext: SessionContext =
+ session.asInstanceOf[FlinkSessionImpl].getSessionContext
+ protected var resultSet: ResultSet = _
+
+ override protected def beforeRun(): Unit = {
+ setHasResultSet(true)
+ setState(OperationState.RUNNING)
+ }
+
+ override protected def afterRun(): Unit = {
+ state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(OperationState.FINISHED)
+ }
+ }
+ OperationLog.removeCurrentOperationLog()
+ }
+
+ override def cancel(): Unit = {
+ cleanup(OperationState.CANCELED)
+ }
+
+ override def close(): Unit = {
+ cleanup(OperationState.CLOSED)
+ try {
+ getOperationLog.foreach(_.close())
+ } catch {
+ case e: IOException =>
+ error(e.getMessage, e)
+ }
+ }
+
+ override def getResultSetSchema: TTableSchema = {
+ val tTableSchema = new TTableSchema()
+ resultSet.getColumns.asScala.zipWithIndex.foreach { case (f, i) =>
+ tTableSchema.addToColumns(RowSet.toTColumnDesc(f, i))
+ }
+ tTableSchema
+ }
+
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+ validateDefaultFetchOrientation(order)
+ assertState(OperationState.FINISHED)
+ setHasResultSet(true)
+ order match {
+ case FETCH_NEXT => resultSet.getData.fetchNext()
+ case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize);
+ case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
+ }
+ val token = resultSet.getData.take(rowSetSize)
+ val resultRowSet = RowSet.resultSetToTRowSet(
+ token.toList,
+ resultSet,
+ getProtocolVersion)
+ resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
+ resultRowSet
+ }
+
+ override def shouldRunAsync: Boolean = false
+
+ protected def cleanup(targetState: OperationState): Unit = state.synchronized {
+ if (!isTerminalState(state)) {
+ setState(targetState)
+ if (shouldRunAsync) {
+ Option(getBackgroundHandle).foreach(_.cancel(true))
+ }
+ }
+ }
+
+ protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
+ // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
+ // could be thrown.
+ case e: Throwable =>
+ state.synchronized {
+ val errMsg = Utils.stringifyException(e)
+ if (state == OperationState.TIMEOUT) {
+ val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
+ setOperationException(ke)
+ throw ke
+ } else if (isTerminalState(state)) {
+ setOperationException(KyuubiSQLException(errMsg))
+ warn(s"Ignore exception in terminal state with $statementId: $errMsg")
+ } else {
+ setState(OperationState.ERROR)
+ error(s"Error operating $opType: $errMsg", e)
+ val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
+ setOperationException(ke)
+ throw ke
+ }
+ }
+ }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
new file mode 100644
index 0000000..df83949
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util
+
+import org.apache.kyuubi.operation.{Operation, OperationManager}
+import org.apache.kyuubi.session.Session
+
+class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManager") {
+
+ override def newExecuteStatementOperation(
+ session: Session,
+ statement: String,
+ runAsync: Boolean,
+ queryTimeout: Long): Operation = null
+
+ override def newGetTypeInfoOperation(session: Session): Operation = null
+
+ override def newGetCatalogsOperation(session: Session): Operation = {
+ val op = new GetCatalogs(session)
+ addOperation(op)
+ }
+
+ override def newGetSchemasOperation(
+ session: Session,
+ catalog: String,
+ schema: String): Operation = null
+
+ override def newGetTablesOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ tableTypes: util.List[String]): Operation = null
+
+ override def newGetTableTypesOperation(session: Session): Operation = null
+
+ override def newGetColumnsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ columnName: String): Operation = null
+
+ override def newGetFunctionsOperation(
+ session: Session,
+ catalogName: String,
+ schemaName: String,
+ functionName: String): Operation = null
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
new file mode 100644
index 0000000..3a7a974
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+class GetCatalogs(session: Session)
+ extends FlinkOperation(OperationType.GET_CATALOGS, session) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ val catalogs: java.util.List[String] =
+ tableEnv.listCatalogs.toList.asJava
+ resultSet = OperationUtil.stringListToResultSet(
+ catalogs,
+ Constants.SHOW_CATALOGS_RESULT)
+ } catch onError()
+ }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
new file mode 100644
index 0000000..069ade7
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.schema
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
+import scala.language.implicitConversions
+
+import org.apache.flink.table.types.logical._
+import org.apache.flink.types.Row
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultSet}
+
+object RowSet {
+
+ def resultSetToTRowSet(
+ rows: Seq[Row],
+ resultSet: ResultSet,
+ protocolVersion: TProtocolVersion): TRowSet = {
+ if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+ toRowBaseSet(rows, resultSet)
+ } else {
+ toColumnBasedSet(rows, resultSet)
+ }
+ }
+
+ def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+ val tRows = rows.map { row =>
+ val tRow = new TRow()
+ (0 until row.getArity).map(i => toTColumnValue(i, row, resultSet))
+ .foreach(tRow.addToColVals)
+ tRow
+ }.asJava
+
+ new TRowSet(0, tRows)
+ }
+
+ def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+ val size = rows.length
+ val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size))
+ resultSet.getColumns.asScala.zipWithIndex.foreach { case (filed, i) =>
+ val tColumn = toTColumn(rows, i, filed.getLogicalType)
+ tRowSet.addToColumns(tColumn)
+ }
+ tRowSet
+ }
+
+ private def toTColumnValue(
+ ordinal: Int,
+ row: Row,
+ resultSet: ResultSet): TColumnValue = {
+
+ val logicalType = resultSet.getColumns.get(ordinal).getLogicalType
+
+ if (logicalType.isInstanceOf[BooleanType]) {
+ val boolValue = new TBoolValue
+ if (row.getField(ordinal) != null) {
+ boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
+ }
+ TColumnValue.boolVal(boolValue)
+ } else if (logicalType.isInstanceOf[TinyIntType]) {
+ val tI16Value = new TI16Value
+ if (row.getField(ordinal) != null) {
+ tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+ }
+ TColumnValue.i16Val(tI16Value)
+ } else if (logicalType.isInstanceOf[IntType]) {
+ val tI32Value = new TI32Value
+ if (row.getField(ordinal) != null) {
+ tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+ }
+ TColumnValue.i32Val(tI32Value)
+ } else if (logicalType.isInstanceOf[BigIntType]) {
+ val tI64Value = new TI64Value
+ if (row.getField(ordinal) != null) {
+ tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
+ }
+ TColumnValue.i64Val(tI64Value)
+ } else if (logicalType.isInstanceOf[FloatType]) {
+ val tDoubleValue = new TDoubleValue
+ if (row.getField(ordinal) != null) {
+ tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
+ }
+ TColumnValue.doubleVal(tDoubleValue)
+ } else if (logicalType.isInstanceOf[DoubleType]) {
+ val tDoubleValue = new TDoubleValue
+ if (row.getField(ordinal) != null) {
+ tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
+ }
+ TColumnValue.doubleVal(tDoubleValue)
+ } else if (logicalType.isInstanceOf[VarCharType]) {
+ val tStringValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+ }
+ TColumnValue.stringVal(tStringValue)
+ } else {
+ val tStrValue = new TStringValue
+ if (row.getField(ordinal) != null) {
+ // TODO to be done
+ }
+ TColumnValue.stringVal(tStrValue)
+ }
+ }
+
+ implicit private def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
+ ByteBuffer.wrap(bitSet.toByteArray)
+ }
+
+ private def toTColumn(
+ rows: Seq[Row],
+ ordinal: Int,
+ logicalType: LogicalType): TColumn = {
+ val nulls = new java.util.BitSet()
+ if (logicalType.isInstanceOf[BooleanType]) {
+ val values = getOrSetAsNull[java.lang.Boolean](
+ rows,
+ ordinal,
+ nulls,
+ true)
+ TColumn.boolVal(new TBoolColumn(values, nulls))
+ } else if (logicalType.isInstanceOf[TinyIntType]) {
+ val values = getOrSetAsNull[java.lang.Short](
+ rows,
+ ordinal,
+ nulls,
+ 0.toShort)
+ TColumn.i16Val(new TI16Column(values, nulls))
+ } else if (logicalType.isInstanceOf[VarCharType]) {
+ val values = getOrSetAsNull[java.lang.String](
+ rows,
+ ordinal,
+ nulls,
+ "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ } else if (logicalType.isInstanceOf[CharType]) {
+ val values = getOrSetAsNull[java.lang.String](
+ rows,
+ ordinal,
+ nulls,
+ "")
+ TColumn.stringVal(new TStringColumn(values, nulls))
+ } else {
+ null
+ }
+ }
+
+ private def getOrSetAsNull[T](
+ rows: Seq[Row],
+ ordinal: Int,
+ nulls: java.util.BitSet,
+ defaultVal: T): java.util.List[T] = {
+ val size = rows.length
+ val ret = new java.util.ArrayList[T](size)
+ var idx = 0
+ while (idx < size) {
+ val row = rows(idx)
+ val isNull = row.getField(ordinal) == null
+ if (isNull) {
+ nulls.set(idx, true)
+ ret.add(idx, defaultVal)
+ } else {
+ ret.add(idx, row.getField(ordinal).asInstanceOf[T])
+ }
+ idx += 1
+ }
+ ret
+ }
+
+ def toTColumnDesc(field: ColumnInfo, pos: Int): TColumnDesc = {
+ val tColumnDesc = new TColumnDesc()
+ tColumnDesc.setColumnName(field.getName)
+ tColumnDesc.setTypeDesc(toTTypeDesc(field.getLogicalType))
+ tColumnDesc.setComment("")
+ tColumnDesc.setPosition(pos)
+ tColumnDesc
+ }
+
+ def toTTypeDesc(typ: LogicalType): TTypeDesc = {
+ val typeEntry = new TPrimitiveTypeEntry(toTTypeId(typ))
+ typeEntry.setTypeQualifiers(toTTypeQualifiers(typ))
+ val tTypeDesc = new TTypeDesc()
+ tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry))
+ tTypeDesc
+ }
+
+ def toTTypeQualifiers(typ: LogicalType): TTypeQualifiers = {
+ val ret = new TTypeQualifiers()
+ val qualifiers = typ match {
+ case d: DecimalType =>
+ Map(
+ TCLIServiceConstants.PRECISION -> TTypeQualifierValue.i32Value(d.getPrecision),
+ TCLIServiceConstants.SCALE -> TTypeQualifierValue.i32Value(d.getScale)).asJava
+ case _ => Collections.emptyMap[String, TTypeQualifierValue]()
+ }
+ ret.setQualifiers(qualifiers)
+ ret
+ }
+
+ def toTTypeId(typ: LogicalType): TTypeId =
+ if (typ.isInstanceOf[NullType]) {
+ TTypeId.NULL_TYPE
+ } else if (typ.isInstanceOf[BooleanType]) {
+ TTypeId.BOOLEAN_TYPE
+ } else if (typ.isInstanceOf[FloatType]) {
+ TTypeId.FLOAT_TYPE
+ } else if (typ.isInstanceOf[DoubleType]) {
+ TTypeId.DOUBLE_TYPE
+ } else if (typ.isInstanceOf[VarCharType]) {
+ TTypeId.STRING_TYPE
+ } else if (typ.isInstanceOf[CharType]) {
+ TTypeId.STRING_TYPE
+ } else {
+ null
+ }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
new file mode 100644
index 0000000..305ef10
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.session
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.engine.flink.context.EngineContext
+import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
+import org.apache.kyuubi.session.{SessionHandle, SessionManager}
+
+class FlinkSQLSessionManager(engineContext: EngineContext)
+ extends SessionManager("FlinkSQLSessionManager") {
+
+ override protected def isServer: Boolean = false
+
+ val operationManager = new FlinkSQLOperationManager()
+
+ override def openSession(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String]): SessionHandle = null
+
+ override def closeSession(sessionHandle: SessionHandle): Unit = {}
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
new file mode 100644
index 0000000..eda799f
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.session
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.engine.flink.context.SessionContext
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+
+class FlinkSessionImpl(
+ protocol: TProtocolVersion,
+ user: String,
+ password: String,
+ ipAddress: String,
+ conf: Map[String, String],
+ sessionManager: SessionManager,
+ sessionContext: SessionContext)
+ extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+
+ override val handle: SessionHandle = SessionHandle(protocol)
+
+ def getSessionContext: SessionContext = sessionContext
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
new file mode 100644
index 0000000..d95c93e
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.Collections
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.flink.client.cli.DefaultCLI
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
+import org.apache.flink.configuration.Configuration
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment
+import org.apache.kyuubi.engine.flink.config.entries.ExecutionEntry
+import org.apache.kyuubi.engine.flink.context.{EngineContext, SessionContext}
+import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
+import org.apache.kyuubi.operation.FetchOrientation
+
+class FlinkOperationSuite extends KyuubiFunSuite {
+
+ val user: String = Utils.currentUser
+ val password = "anonymous"
+
+ var engineEnv = new EngineEnvironment
+ var engineContext = new EngineContext(
+ engineEnv,
+ Collections.emptyList(),
+ new Configuration,
+ new DefaultCLI,
+ new DefaultClusterClientServiceLoader)
+ var sessionContext: SessionContext = _
+ var flinkSession: FlinkSessionImpl = _
+
+ override def beforeAll(): Unit = {
+ engineEnv = EngineEnvironment.enrich(
+ engineContext.getEngineEnv,
+ Map(EngineEnvironment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_TYPE
+ -> ExecutionEntry.EXECUTION_TYPE_VALUE_BATCH).asJava)
+ sessionContext = new SessionContext(engineEnv, engineContext)
+ val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
+ flinkSQLSessionManager.initialize(KyuubiConf())
+ flinkSession = new FlinkSessionImpl(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+ user,
+ password,
+ "localhost",
+ Map(),
+ flinkSQLSessionManager,
+ sessionContext)
+ super.beforeAll()
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ }
+
+ test("get catalogs for flink sql") {
+ val getCatalogOperation = new GetCatalogs(flinkSession)
+ getCatalogOperation.run()
+
+ val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+ assert(1 == resultSet.getRowsSize)
+ assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
+ }
+
+}
diff --git a/pom.xml b/pom.xml
index 86ea42b..f0b36ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -988,6 +988,80 @@
<artifactId>swagger-ui</artifactId>
<version>${swagger-ui.version}</version>
</dependency>
+
+ <!-- flink -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-parser</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-yarn_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>