You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2021/12/14 09:40:24 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 53ff56b  [KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization
53ff56b is described below

commit 53ff56be29884b97898bd1363df3704d42d9fa9e
Author: yanghua <ya...@gmail.com>
AuthorDate: Tue Dec 14 17:40:11 2021 +0800

    [KYUUBI #1525][FLINK] Implement GetCatalogs operation and do the further initialization
    
    …
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #1529 from yanghua/KYUUBI-1525.
    
    Closes #1525
    
    e166965d [yanghua] Address review suggestion
    9929aa29 [yanghua] Remove some comment
    eafb200b [yanghua] refactor code
    58df34c7 [yanghua] refactor code
    675e14e7 [yanghua] pruning unnecessary code
    15e97283 [yanghua] pruning unnecessary code
    c6d4f549 [yanghua] pruning unnecessary code
    b099a455 [yanghua] pruning unnecessary code
    316086bb [yanghua] pruning unnecessary code
    fe7af8ef [yanghua] pruning unnecessary code
    ecf3ccf4 [yanghua] pruning unnecessary code
    4c174517 [yanghua] pruning unnecessary code
    b9226c3e [yanghua] pruning unnecessary code
    fb2f6a62 [yanghua] pruning unnecessary code
    21e21a50 [yanghua] pruning unnecessary code
    a58076ff [yanghua] pruning unnecessary code
    acd24fc5 [yanghua] pruning unnecessary code
    333d4275 [yanghua] pruning unnecessary code
    7416b3de [yanghua] pruning unnecessary code
    84d0db91 [yanghua] pruning unnecessary code and refactor code
    a55db41a [yanghua] pruning unnecessary code
    b9909308 [yanghua] [KYUUBI #1525] Implement GetCatalogs operation and do the further initialization
    
    Authored-by: yanghua <ya...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 externals/kyuubi-flink-sql-engine/pom.xml          |  88 +++++
 .../engine/flink/config/EngineEnvironment.java     |  81 +++++
 .../engine/flink/config/entries/ConfigEntry.java   |  68 ++++
 .../flink/config/entries/ExecutionEntry.java       | 368 +++++++++++++++++++++
 .../kyuubi/engine/flink/context/EngineContext.java | 149 +++++++++
 .../engine/flink/context/ExecutionContext.java     | 306 +++++++++++++++++
 .../engine/flink/context/SessionContext.java       |  69 ++++
 .../kyuubi/engine/flink/result/ColumnInfo.java     |  81 +++++
 .../kyuubi/engine/flink/result/Constants.java      |  25 ++
 .../kyuubi/engine/flink/result/OperationUtil.java  |  49 +++
 .../kyuubi/engine/flink/result/ResultKind.java     |  25 ++
 .../kyuubi/engine/flink/result/ResultSet.java      | 165 +++++++++
 .../engine/flink/operation/FlinkOperation.scala    | 135 ++++++++
 .../flink/operation/FlinkSQLOperationManager.scala |  67 ++++
 .../engine/flink/operation/GetCatalogs.scala       |  40 +++
 .../apache/kyuubi/engine/flink/schema/RowSet.scala | 236 +++++++++++++
 .../flink/session/FlinkSQLSessionManager.scala     |  41 +++
 .../engine/flink/session/FlinkSessionImpl.scala    |  39 +++
 .../flink/operation/FlinkOperationSuite.scala      |  84 +++++
 pom.xml                                            |  74 +++++
 20 files changed, 2190 insertions(+)

diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index b2c49a4..a652b9d 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -45,6 +45,94 @@
       <version>${project.version}</version>
     </dependency>
 
+    <!-- flink -->
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-clients_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-common</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-api-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-sql-parser</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.flink</groupId>
+      <artifactId>flink-yarn_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <!-- tests -->
+    <dependency>
+      <groupId>org.apache.kyuubi</groupId>
+      <artifactId>kyuubi-common_${scala.binary.version}</artifactId>
+      <version>${project.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kyuubi</groupId>
+      <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>jul-to-slf4j</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
 </project>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java
new file mode 100644
index 0000000..20bc0c0
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/EngineEnvironment.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config;
+
+import java.util.Collections;
+import java.util.Map;
+import org.apache.kyuubi.engine.flink.config.entries.ExecutionEntry;
+
+/**
+ * EngineEnvironment configuration that represents the content of an environment file.
+ * EngineEnvironment files define engine, session, catalogs, tables, execution, and deployment
+ * behavior. An environment might be defined by default or as part of a session. Environments can be
+ * merged or enriched with properties.
+ */
+public class EngineEnvironment {
+
+  public static final String EXECUTION_ENTRY = "execution";
+
+  private ExecutionEntry execution;
+
+  public EngineEnvironment() {
+    this.execution = ExecutionEntry.DEFAULT_INSTANCE;
+  }
+
+  public ExecutionEntry getExecution() {
+    return execution;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder();
+    sb.append("=================== Execution ====================\n");
+    execution.asTopLevelMap().forEach((k, v) -> sb.append(k).append(": ").append(v).append('\n'));
+    return sb.toString();
+  }
+
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Merges two environments. The properties of the first environment might be overwritten by the
+   * second one.
+   */
+  public static EngineEnvironment merge(EngineEnvironment env1, EngineEnvironment env2) {
+    final EngineEnvironment mergedEnv = new EngineEnvironment();
+
+    // merge execution properties
+    mergedEnv.execution = ExecutionEntry.merge(env1.getExecution(), env2.getExecution());
+
+    return mergedEnv;
+  }
+
+  public EngineEnvironment clone() {
+    return enrich(this, Collections.emptyMap());
+  }
+
+  /** Enriches an environment with new/modified properties or views and returns the new instance. */
+  public static EngineEnvironment enrich(EngineEnvironment env, Map<String, String> properties) {
+    final EngineEnvironment enrichedEnv = new EngineEnvironment();
+
+    // enrich execution properties
+    enrichedEnv.execution = ExecutionEntry.enrich(env.execution, properties);
+
+    return enrichedEnv;
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java
new file mode 100644
index 0000000..59540c4
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ConfigEntry.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config.entries;
+
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+
+/**
+ * Describes an environment configuration entry (such as catalogs, table, functions, views). Config
+ * entries are similar to {@link org.apache.flink.table.descriptors.Descriptor} but apply to SQL
+ * Engine's environment files only.
+ */
+abstract class ConfigEntry {
+
+  protected final DescriptorProperties properties;
+
+  protected ConfigEntry(DescriptorProperties properties) {
+    try {
+      validate(properties);
+    } catch (ValidationException e) {
+      throw new RuntimeException("Invalid configuration entry.", e);
+    }
+
+    this.properties = properties;
+  }
+
+  /** Performs syntactic validation. */
+  protected abstract void validate(DescriptorProperties properties);
+
+  public Map<String, String> asMap() {
+    return properties.asMap();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ConfigEntry that = (ConfigEntry) o;
+    return Objects.equals(properties, that.properties);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(properties);
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java
new file mode 100644
index 0000000..279b72d
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/config/entries/ExecutionEntry.java
@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.config.entries;
+
+import static org.apache.kyuubi.engine.flink.config.EngineEnvironment.EXECUTION_ENTRY;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration of a table program execution. This class parses the `execution` part in an
+ * environment file. The execution describes properties that would usually be defined in the
+ * ExecutionEnvironment/StreamExecutionEnvironment/TableEnvironment or as code in a Flink job.
+ *
+ * <p>All properties of this entry are optional and evaluated lazily. TODO remove the useless
+ * properties
+ */
+public class ExecutionEntry extends ConfigEntry {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionEntry.class);
+
+  public static final ExecutionEntry DEFAULT_INSTANCE =
+      new ExecutionEntry(new DescriptorProperties(true));
+
+  public static final String EXECUTION_PLANNER = "planner";
+
+  public static final String EXECUTION_PLANNER_VALUE_BLINK = "blink";
+
+  public static final String EXECUTION_TYPE = "type";
+
+  public static final String EXECUTION_TYPE_VALUE_STREAMING = "streaming";
+
+  public static final String EXECUTION_TYPE_VALUE_BATCH = "batch";
+
+  private static final String EXECUTION_TIME_CHARACTERISTIC = "time-characteristic";
+
+  private static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME = "event-time";
+
+  private static final String EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME =
+      "processing-time";
+
+  private static final String EXECUTION_PERIODIC_WATERMARKS_INTERVAL =
+      "periodic-watermarks-interval";
+
+  private static final String EXECUTION_MIN_STATE_RETENTION = "min-idle-state-retention";
+
+  private static final String EXECUTION_MAX_STATE_RETENTION = "max-idle-state-retention";
+
+  private static final String EXECUTION_PARALLELISM = "parallelism";
+
+  private static final String EXECUTION_MAX_PARALLELISM = "max-parallelism";
+
+  public static final String EXECUTION_RESULT_MODE = "result-mode";
+
+  public static final String EXECUTION_RESULT_MODE_VALUE_CHANGELOG = "changelog";
+
+  public static final String EXECUTION_RESULT_MODE_VALUE_TABLE = "table";
+
+  public static final String EXECUTION_MAX_BUFFER_SIZE = "max_buffer_size";
+
+  private static final String EXECUTION_RESTART_STRATEGY_TYPE = "restart-strategy.type";
+
+  private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK = "fallback";
+
+  private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE = "none";
+
+  private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY = "fixed-delay";
+
+  private static final String EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE = "failure-rate";
+
+  private static final String EXECUTION_RESTART_STRATEGY_ATTEMPTS = "restart-strategy.attempts";
+
+  private static final String EXECUTION_RESTART_STRATEGY_DELAY = "restart-strategy.delay";
+
+  private static final String EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL =
+      "restart-strategy.failure-rate-interval";
+
+  private static final String EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL =
+      "restart-strategy.max-failures-per-interval";
+
+  public static final String EXECUTION_CURRENT_CATALOG = "current-catalog";
+
+  public static final String EXECUTION_CURRENT_DATABASE = "current-database";
+
+  private ExecutionEntry(DescriptorProperties properties) {
+    super(properties);
+  }
+
+  @Override
+  protected void validate(DescriptorProperties properties) {
+    properties.validateEnumValues(
+        EXECUTION_PLANNER, true, Arrays.asList(EXECUTION_PLANNER_VALUE_BLINK));
+    properties.validateEnumValues(
+        EXECUTION_TYPE,
+        true,
+        Arrays.asList(EXECUTION_TYPE_VALUE_BATCH, EXECUTION_TYPE_VALUE_STREAMING));
+    properties.validateEnumValues(
+        EXECUTION_TIME_CHARACTERISTIC,
+        true,
+        Arrays.asList(
+            EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME,
+            EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME));
+    properties.validateLong(EXECUTION_PERIODIC_WATERMARKS_INTERVAL, true, 1);
+    properties.validateLong(EXECUTION_MIN_STATE_RETENTION, true, 0);
+    properties.validateLong(EXECUTION_MAX_STATE_RETENTION, true, 0);
+    properties.validateInt(EXECUTION_PARALLELISM, true, 1);
+    properties.validateInt(EXECUTION_MAX_PARALLELISM, true, 1);
+    properties.validateEnumValues(
+        EXECUTION_RESTART_STRATEGY_TYPE,
+        true,
+        Arrays.asList(
+            EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK,
+            EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE,
+            EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY,
+            EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE));
+    properties.validateInt(EXECUTION_RESTART_STRATEGY_ATTEMPTS, true, 1);
+    properties.validateLong(EXECUTION_RESTART_STRATEGY_DELAY, true, 0);
+    properties.validateLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, true, 1);
+    properties.validateInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, true, 1);
+    properties.validateString(EXECUTION_CURRENT_CATALOG, true, 1);
+    properties.validateString(EXECUTION_CURRENT_DATABASE, true, 1);
+    properties.validateInt(EXECUTION_MAX_BUFFER_SIZE, true, 1);
+  }
+
+  public EnvironmentSettings getEnvironmentSettings() {
+    final EnvironmentSettings.Builder builder = EnvironmentSettings.newInstance();
+
+    if (inStreamingMode()) {
+      builder.inStreamingMode();
+    } else if (inBatchMode()) {
+      builder.inBatchMode();
+    }
+
+    final String planner =
+        properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+    if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+      builder.useBlinkPlanner();
+    }
+
+    return builder.build();
+  }
+
+  public boolean inStreamingMode() {
+    return properties
+        .getOptionalString(EXECUTION_TYPE)
+        .map((v) -> v.equals(EXECUTION_TYPE_VALUE_STREAMING))
+        .orElse(false);
+  }
+
+  public boolean inBatchMode() {
+    return properties
+        .getOptionalString(EXECUTION_TYPE)
+        .map((v) -> v.equals(EXECUTION_TYPE_VALUE_BATCH))
+        .orElse(false);
+  }
+
+  public boolean isStreamingPlanner() {
+    final String planner =
+        properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+    // Blink planner is a streaming planner
+    if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+      return true;
+    } else {
+      return inStreamingMode();
+    }
+  }
+
+  public boolean isBatchPlanner() {
+    final String planner =
+        properties.getOptionalString(EXECUTION_PLANNER).orElse(EXECUTION_PLANNER_VALUE_BLINK);
+
+    // Blink planner is not a batch planner
+    if (planner.equals(EXECUTION_PLANNER_VALUE_BLINK)) {
+      return false;
+    } else {
+      return inBatchMode();
+    }
+  }
+
+  public TimeCharacteristic getTimeCharacteristic() {
+    return properties
+        .getOptionalString(EXECUTION_TIME_CHARACTERISTIC)
+        .flatMap(
+            (v) -> {
+              switch (v) {
+                case EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME:
+                  return Optional.of(TimeCharacteristic.EventTime);
+                case EXECUTION_TIME_CHARACTERISTIC_VALUE_PROCESSING_TIME:
+                  return Optional.of(TimeCharacteristic.ProcessingTime);
+                default:
+                  return Optional.empty();
+              }
+            })
+        .orElseGet(
+            () ->
+                useDefaultValue(
+                    EXECUTION_TIME_CHARACTERISTIC,
+                    TimeCharacteristic.EventTime,
+                    EXECUTION_TIME_CHARACTERISTIC_VALUE_EVENT_TIME));
+  }
+
+  public long getPeriodicWatermarksInterval() {
+    return properties
+        .getOptionalLong(EXECUTION_PERIODIC_WATERMARKS_INTERVAL)
+        .orElseGet(() -> useDefaultValue(EXECUTION_PERIODIC_WATERMARKS_INTERVAL, 200L));
+  }
+
+  public int getParallelism() {
+    return properties
+        .getOptionalInt(EXECUTION_PARALLELISM)
+        .orElseGet(() -> useDefaultValue(EXECUTION_PARALLELISM, 1));
+  }
+
+  public int getMaxParallelism() {
+    return properties
+        .getOptionalInt(EXECUTION_MAX_PARALLELISM)
+        .orElseGet(() -> useDefaultValue(EXECUTION_MAX_PARALLELISM, 128));
+  }
+
+  public RestartStrategies.RestartStrategyConfiguration getRestartStrategy() {
+    return properties
+        .getOptionalString(EXECUTION_RESTART_STRATEGY_TYPE)
+        .flatMap(
+            (v) -> {
+              switch (v) {
+                case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_NONE:
+                  return Optional.of(RestartStrategies.noRestart());
+                case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FIXED_DELAY:
+                  final int attempts =
+                      properties
+                          .getOptionalInt(EXECUTION_RESTART_STRATEGY_ATTEMPTS)
+                          .orElseGet(
+                              () ->
+                                  useDefaultValue(
+                                      EXECUTION_RESTART_STRATEGY_ATTEMPTS, Integer.MAX_VALUE));
+                  final long fixedDelay =
+                      properties
+                          .getOptionalLong(EXECUTION_RESTART_STRATEGY_DELAY)
+                          .orElseGet(
+                              () -> useDefaultValue(EXECUTION_RESTART_STRATEGY_DELAY, 10_000L));
+                  return Optional.of(RestartStrategies.fixedDelayRestart(attempts, fixedDelay));
+                case EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FAILURE_RATE:
+                  final int failureRate =
+                      properties
+                          .getOptionalInt(EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL)
+                          .orElseGet(
+                              () ->
+                                  useDefaultValue(
+                                      EXECUTION_RESTART_STRATEGY_MAX_FAILURES_PER_INTERVAL, 1));
+                  final long failureInterval =
+                      properties
+                          .getOptionalLong(EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL)
+                          .orElseGet(
+                              () ->
+                                  useDefaultValue(
+                                      EXECUTION_RESTART_STRATEGY_FAILURE_RATE_INTERVAL, 60_000L));
+                  final long attemptDelay =
+                      properties
+                          .getOptionalLong(EXECUTION_RESTART_STRATEGY_DELAY)
+                          .orElseGet(
+                              () -> useDefaultValue(EXECUTION_RESTART_STRATEGY_DELAY, 10_000L));
+                  return Optional.of(
+                      RestartStrategies.failureRateRestart(
+                          failureRate,
+                          Time.milliseconds(failureInterval),
+                          Time.milliseconds(attemptDelay)));
+                default:
+                  return Optional.empty();
+              }
+            })
+        .orElseGet(
+            () ->
+                useDefaultValue(
+                    EXECUTION_RESTART_STRATEGY_TYPE,
+                    RestartStrategies.fallBackRestart(),
+                    EXECUTION_RESTART_STRATEGY_TYPE_VALUE_FALLBACK));
+  }
+
+  public Optional<String> getCurrentCatalog() {
+    return properties.getOptionalString(EXECUTION_CURRENT_CATALOG);
+  }
+
+  public Optional<String> getCurrentDatabase() {
+    return properties.getOptionalString(EXECUTION_CURRENT_DATABASE);
+  }
+
+  public Map<String, String> asTopLevelMap() {
+    return properties.asPrefixedMap(EXECUTION_ENTRY + '.');
+  }
+
+  private <V> V useDefaultValue(String key, V defaultValue) {
+    return useDefaultValue(key, defaultValue, defaultValue.toString());
+  }
+
+  private <V> V useDefaultValue(String key, V defaultValue, String defaultString) {
+    LOG.info(
+        "Property '{}.{}' not specified. Using default value: {}",
+        EXECUTION_ENTRY,
+        key,
+        defaultString);
+    return defaultValue;
+  }
+
+  // --------------------------------------------------------------------------------------------
+
+  /**
+   * Merges two execution entries. The properties of the first execution entry might be overwritten
+   * by the second one.
+   */
+  public static ExecutionEntry merge(ExecutionEntry execution1, ExecutionEntry execution2) {
+    final Map<String, String> mergedProperties = new HashMap<>(execution1.asMap());
+    mergedProperties.putAll(execution2.asMap());
+
+    final DescriptorProperties properties = new DescriptorProperties(true);
+    properties.putProperties(mergedProperties);
+
+    return new ExecutionEntry(properties);
+  }
+
+  /**
+   * Creates a new execution entry enriched with additional properties that are prefixed with
+   * EngineEnvironment#EXECUTION_ENTRY.
+   */
+  public static ExecutionEntry enrich(
+      ExecutionEntry execution, Map<String, String> prefixedProperties) {
+    final Map<String, String> enrichedProperties = new HashMap<>(execution.asMap());
+
+    prefixedProperties.forEach(
+        (k, v) -> {
+          final String normalizedKey = k.toLowerCase();
+          if (k.startsWith(EXECUTION_ENTRY + '.')) {
+            enrichedProperties.put(normalizedKey.substring(EXECUTION_ENTRY.length() + 1), v);
+          }
+        });
+
+    final DescriptorProperties properties = new DescriptorProperties(true);
+    properties.putProperties(enrichedProperties);
+
+    return new ExecutionEntry(properties);
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java
new file mode 100644
index 0000000..3b46057
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/EngineContext.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import org.apache.commons.cli.Options;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.cli.CliFrontendParser;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+
+/** Context describing default environment, dependencies, flink config, etc. */
+public class EngineContext {
+  private final EngineEnvironment engineEnv;
+  private final List<URL> dependencies;
+  private final Configuration flinkConfig;
+  private final List<CustomCommandLine> commandLines;
+  private final Options commandLineOptions;
+  private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+  public EngineContext(EngineEnvironment engineEnv, List<URL> dependencies) {
+    this.engineEnv = engineEnv;
+    this.dependencies = dependencies;
+
+    // discover configuration
+    final String flinkConfigDir;
+    try {
+      // find the configuration directory
+      flinkConfigDir = CliFrontend.getConfigurationDirectoryFromEnv();
+
+      // load the global configuration
+      this.flinkConfig = GlobalConfiguration.loadConfiguration(flinkConfigDir);
+
+      // initialize default file system
+      FileSystem.initialize(
+          flinkConfig, PluginUtils.createPluginManagerFromRootFolder(flinkConfig));
+
+      // load command lines for deployment
+      this.commandLines = CliFrontend.loadCustomCommandLines(flinkConfig, flinkConfigDir);
+      this.commandLineOptions = collectCommandLineOptions(commandLines);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not load Flink configuration.", e);
+    }
+    clusterClientServiceLoader = new DefaultClusterClientServiceLoader();
+  }
+
+  /** Constructor for testing purposes. */
+  @VisibleForTesting
+  public EngineContext(
+      EngineEnvironment engineEnv,
+      List<URL> dependencies,
+      Configuration flinkConfig,
+      CustomCommandLine commandLine,
+      ClusterClientServiceLoader clusterClientServiceLoader) {
+    this.engineEnv = engineEnv;
+    this.dependencies = dependencies;
+    this.flinkConfig = flinkConfig;
+    this.commandLines = Collections.singletonList(commandLine);
+    this.commandLineOptions = collectCommandLineOptions(commandLines);
+    this.clusterClientServiceLoader = Objects.requireNonNull(clusterClientServiceLoader);
+  }
+
+  public Configuration getFlinkConfig() {
+    return flinkConfig;
+  }
+
+  public EngineEnvironment getEngineEnv() {
+    return engineEnv;
+  }
+
+  public List<URL> getDependencies() {
+    return dependencies;
+  }
+
+  public List<CustomCommandLine> getCommandLines() {
+    return commandLines;
+  }
+
+  public Options getCommandLineOptions() {
+    return commandLineOptions;
+  }
+
+  public ClusterClientServiceLoader getClusterClientServiceLoader() {
+    return clusterClientServiceLoader;
+  }
+
+  private Options collectCommandLineOptions(List<CustomCommandLine> commandLines) {
+    final Options customOptions = new Options();
+    for (CustomCommandLine customCommandLine : commandLines) {
+      customCommandLine.addGeneralOptions(customOptions);
+      customCommandLine.addRunOptions(customOptions);
+    }
+    return CliFrontendParser.mergeOptions(CliFrontendParser.getRunCommandOptions(), customOptions);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof EngineContext)) {
+      return false;
+    }
+    EngineContext context = (EngineContext) o;
+    return Objects.equals(engineEnv, context.engineEnv)
+        && Objects.equals(dependencies, context.dependencies)
+        && Objects.equals(flinkConfig, context.flinkConfig)
+        && Objects.equals(commandLines, context.commandLines)
+        && Objects.equals(commandLineOptions, context.commandLineOptions)
+        && Objects.equals(clusterClientServiceLoader, context.clusterClientServiceLoader);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(
+        engineEnv,
+        dependencies,
+        flinkConfig,
+        commandLines,
+        commandLineOptions,
+        clusterClientServiceLoader);
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java
new file mode 100644
index 0000000..7a9a6e6
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/ExecutionContext.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.lang.reflect.Method;
+import java.net.URL;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.commons.cli.Options;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.cli.CustomCommandLine;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.CatalogManager;
+import org.apache.flink.table.catalog.FunctionCatalog;
+import org.apache.flink.table.catalog.GenericInMemoryCatalog;
+import org.apache.flink.table.delegation.Executor;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.Planner;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.ComponentFactoryService;
+import org.apache.flink.table.module.ModuleManager;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TemporaryClassLoaderContext;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Context for executing table programs. This class caches everything that can be cached across
+ * multiple queries as long as the session context does not change. This must be thread-safe as it
+ * might be reused across different query submissions.
+ *
+ * @param <ClusterID> cluster id
+ */
+public class ExecutionContext<ClusterID> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ExecutionContext.class);
+
+  private final EngineEnvironment engineEnvironment;
+  private final ClassLoader classLoader;
+
+  private final Configuration flinkConfig;
+
+  private TableEnvironmentInternal tableEnv;
+  private StreamExecutionEnvironment streamExecEnv;
+  private Executor executor;
+
+  private ExecutionContext(
+      EngineEnvironment engineEnvironment,
+      List<URL> dependencies,
+      Configuration flinkConfig,
+      ClusterClientServiceLoader clusterClientServiceLoader,
+      Options commandLineOptions,
+      List<CustomCommandLine> availableCommandLines)
+      throws FlinkException {
+    this.engineEnvironment = engineEnvironment;
+
+    this.flinkConfig = flinkConfig;
+
+    // create class loader
+    classLoader =
+        ClientUtils.buildUserCodeClassLoader(
+            dependencies, Collections.emptyList(), this.getClass().getClassLoader(), flinkConfig);
+
+    // Initialize the TableEnvironment.
+    initializeTableEnvironment();
+  }
+
+  /**
+   * Executes the given supplier using the execution context's classloader as thread classloader.
+   */
+  public <R> R wrapClassLoader(Supplier<R> supplier) {
+    try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+      return supplier.get();
+    }
+  }
+
+  /**
+   * Executes the given Runnable using the execution context's classloader as thread classloader.
+   */
+  void wrapClassLoader(Runnable runnable) {
+    try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(classLoader)) {
+      runnable.run();
+    }
+  }
+
+  public TableEnvironmentInternal getTableEnvironment() {
+    return tableEnv;
+  }
+
+  /** Returns a builder for this {@link ExecutionContext}. */
+  public static Builder builder(
+      EngineEnvironment defaultEnv,
+      EngineEnvironment sessionEnv,
+      List<URL> dependencies,
+      Configuration configuration,
+      ClusterClientServiceLoader serviceLoader,
+      Options commandLineOptions,
+      List<CustomCommandLine> commandLines) {
+    return new Builder(
+        defaultEnv,
+        sessionEnv,
+        dependencies,
+        configuration,
+        serviceLoader,
+        commandLineOptions,
+        commandLines);
+  }
+
+  // ------------------------------------------------------------------------------------------------------------------
+  // Non-public methods
+  // ------------------------------------------------------------------------------------------------------------------
+
+  private TableEnvironmentInternal createStreamTableEnvironment(
+      StreamExecutionEnvironment env,
+      EnvironmentSettings settings,
+      TableConfig config,
+      Executor executor,
+      CatalogManager catalogManager,
+      ModuleManager moduleManager,
+      FunctionCatalog functionCatalog) {
+    final Map<String, String> plannerProperties = settings.toPlannerProperties();
+    final Planner planner =
+        ComponentFactoryService.find(PlannerFactory.class, plannerProperties)
+            .create(plannerProperties, executor, config, functionCatalog, catalogManager);
+
+    return new StreamTableEnvironmentImpl(
+        catalogManager,
+        moduleManager,
+        functionCatalog,
+        config,
+        env,
+        planner,
+        executor,
+        settings.isStreamingMode(),
+        classLoader);
+  }
+
+  private static Executor lookupExecutor(
+      Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
+    try {
+      ExecutorFactory executorFactory =
+          ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
+      Method createMethod =
+          executorFactory
+              .getClass()
+              .getMethod("create", Map.class, StreamExecutionEnvironment.class);
+
+      return (Executor)
+          createMethod.invoke(executorFactory, executorProperties, executionEnvironment);
+    } catch (Exception e) {
+      throw new TableException(
+          "Could not instantiate the executor. Make sure a planner module is on the classpath", e);
+    }
+  }
+
+  private void initializeTableEnvironment() {
+    final EnvironmentSettings settings = engineEnvironment.getExecution().getEnvironmentSettings();
+    final TableConfig config = new TableConfig();
+    final ModuleManager moduleManager = new ModuleManager();
+    final CatalogManager catalogManager =
+        CatalogManager.newBuilder()
+            .classLoader(classLoader)
+            .config(config.getConfiguration())
+            .defaultCatalog(
+                settings.getBuiltInCatalogName(),
+                new GenericInMemoryCatalog(
+                    settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName()))
+            .build();
+    final FunctionCatalog functionCatalog =
+        new FunctionCatalog(config, catalogManager, moduleManager);
+
+    // Must initialize the table engineEnvironment before actually the
+    createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);
+
+    // No need to register the catalogs if already inherit from the same session.
+    initializeCatalogs();
+  }
+
+  private void createTableEnvironment(
+      EnvironmentSettings settings,
+      TableConfig config,
+      CatalogManager catalogManager,
+      ModuleManager moduleManager,
+      FunctionCatalog functionCatalog) {
+    if (engineEnvironment.getExecution().isStreamingPlanner()) {
+      streamExecEnv = createStreamExecutionEnvironment();
+
+      final Map<String, String> executorProperties = settings.toExecutorProperties();
+      executor = lookupExecutor(executorProperties, streamExecEnv);
+      tableEnv =
+          createStreamTableEnvironment(
+              streamExecEnv,
+              settings,
+              config,
+              executor,
+              catalogManager,
+              moduleManager,
+              functionCatalog);
+    } else {
+      throw new RuntimeException("Unsupported execution type specified.");
+    }
+  }
+
+  private void initializeCatalogs() {
+    // Switch to the current catalog.
+    Optional<String> catalog = engineEnvironment.getExecution().getCurrentCatalog();
+    catalog.ifPresent(tableEnv::useCatalog);
+
+    // Switch to the current database.
+    Optional<String> database = engineEnvironment.getExecution().getCurrentDatabase();
+    database.ifPresent(tableEnv::useDatabase);
+  }
+
+  private StreamExecutionEnvironment createStreamExecutionEnvironment() {
+    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+    env.setRestartStrategy(engineEnvironment.getExecution().getRestartStrategy());
+    env.setParallelism(engineEnvironment.getExecution().getParallelism());
+    env.setMaxParallelism(engineEnvironment.getExecution().getMaxParallelism());
+    env.getConfig()
+        .setAutoWatermarkInterval(engineEnvironment.getExecution().getPeriodicWatermarksInterval());
+    return env;
+  }
+
+  // ~ Inner Class -------------------------------------------------------------------------------
+
+  /** Builder for {@link ExecutionContext}. */
+  public static class Builder {
+    // Required members.
+    private final EngineEnvironment sessionEnv;
+    private final List<URL> dependencies;
+    private final Configuration configuration;
+    private final ClusterClientServiceLoader serviceLoader;
+    private final Options commandLineOptions;
+    private final List<CustomCommandLine> commandLines;
+
+    private EngineEnvironment defaultEnv;
+    private EngineEnvironment currentEnv;
+
+    private Builder(
+        EngineEnvironment defaultEnv,
+        @Nullable EngineEnvironment sessionEnv,
+        List<URL> dependencies,
+        Configuration configuration,
+        ClusterClientServiceLoader serviceLoader,
+        Options commandLineOptions,
+        List<CustomCommandLine> commandLines) {
+      this.defaultEnv = defaultEnv;
+      this.sessionEnv = sessionEnv;
+      this.dependencies = dependencies;
+      this.configuration = configuration;
+      this.serviceLoader = serviceLoader;
+      this.commandLineOptions = commandLineOptions;
+      this.commandLines = commandLines;
+    }
+
+    public Builder env(EngineEnvironment engineEnvironment) {
+      this.currentEnv = engineEnvironment;
+      return this;
+    }
+
+    public ExecutionContext<?> build() {
+      try {
+        return new ExecutionContext<>(
+            this.currentEnv == null
+                ? EngineEnvironment.merge(defaultEnv, sessionEnv)
+                : this.currentEnv,
+            this.dependencies,
+            this.configuration,
+            this.serviceLoader,
+            this.commandLineOptions,
+            this.commandLines);
+      } catch (Throwable t) {
+        // catch everything such that a configuration does not crash the executor
+        throw new RuntimeException("Could not create execution context.", t);
+      }
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java
new file mode 100644
index 0000000..b150942
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/context/SessionContext.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.context;
+
+import java.util.Objects;
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment;
+
+/** Context describing current session properties, original properties, ExecutionContext, etc. */
+public class SessionContext {
+  private final EngineEnvironment engineEnv;
+  private final EngineContext engineContext;
+  private ExecutionContext<?> executionContext;
+
+  public SessionContext(EngineEnvironment engineEnv, EngineContext engineContext) {
+    this.engineEnv = engineEnv;
+    this.engineContext = engineContext;
+    this.executionContext = createExecutionContextBuilder(engineEnv).build();
+  }
+
+  public ExecutionContext<?> getExecutionContext() {
+    return executionContext;
+  }
+
+  /** Returns ExecutionContext.Builder with given {@link SessionContext} session context. */
+  public ExecutionContext.Builder createExecutionContextBuilder(EngineEnvironment sessionEnv) {
+    return ExecutionContext.builder(
+        engineContext.getEngineEnv(),
+        sessionEnv,
+        engineContext.getDependencies(),
+        engineContext.getFlinkConfig(),
+        engineContext.getClusterClientServiceLoader(),
+        engineContext.getCommandLineOptions(),
+        engineContext.getCommandLines());
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof SessionContext)) {
+      return false;
+    }
+    SessionContext context = (SessionContext) o;
+    return Objects.equals(engineEnv, context.engineEnv)
+        && Objects.equals(executionContext, context.executionContext);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(engineEnv, executionContext);
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java
new file mode 100644
index 0000000..98982a9
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ColumnInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeParser;
+import org.apache.flink.util.Preconditions;
+
+/** A column info represents a table column's structure with column name, column type. */
+public class ColumnInfo {
+
+  private String name;
+
+  private String type;
+
+  @Nullable private LogicalType logicalType;
+
+  public ColumnInfo(String name, String type) {
+    this.name = Preconditions.checkNotNull(name, "name must not be null");
+    this.type = Preconditions.checkNotNull(type, "type must not be null");
+  }
+
+  public static ColumnInfo create(String name, LogicalType type) {
+    return new ColumnInfo(name, type.toString());
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  public LogicalType getLogicalType() {
+    if (logicalType == null) {
+      logicalType = LogicalTypeParser.parse(type);
+    }
+    return logicalType;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ColumnInfo that = (ColumnInfo) o;
+    return name.equals(that.name) && type.equals(that.type);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, type);
+  }
+
+  @Override
+  public String toString() {
+    return "ColumnInfo{" + "name='" + name + '\'' + ", type='" + type + '\'' + '}';
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
new file mode 100644
index 0000000..6496d84
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+/** Constant column names. */
+public class Constants {
+
+  public static final String SHOW_CATALOGS_RESULT = "catalogs";
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java
new file mode 100644
index 0000000..9224f9d
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/OperationUtil.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.types.Row;
+
+/** Utility class for flink operation. */
+public class OperationUtil {
+
+  public static ResultSet stringListToResultSet(List<String> strings, String columnName) {
+    List<Row> data = new ArrayList<>();
+    boolean isNullable = false;
+    int maxLength = VarCharType.DEFAULT_LENGTH;
+
+    for (String str : strings) {
+      if (str == null) {
+        isNullable = true;
+      } else {
+        maxLength = Math.max(str.length(), maxLength);
+        data.add(Row.of(str));
+      }
+    }
+
+    return ResultSet.builder()
+        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(ColumnInfo.create(columnName, new VarCharType(isNullable, maxLength)))
+        .data(data.toArray(new Row[0]))
+        .build();
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
new file mode 100644
index 0000000..8fb51c2
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultKind.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+/** ResultKind defines the types of the result. */
+public enum ResultKind {
+  // rows with important content are available (DML, DQL)
+  SUCCESS_WITH_CONTENT
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
new file mode 100644
index 0000000..10fc35e
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/ResultSet.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.result;
+
+import com.google.common.collect.Iterators;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+import javax.annotation.Nullable;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.Preconditions;
+import org.apache.kyuubi.operation.ArrayFetchIterator;
+import org.apache.kyuubi.operation.FetchIterator;
+
+/**
+ * A set of one statement execution result containing result kind, column infos, rows of data and
+ * change flags for streaming mode.
+ */
+public class ResultSet {
+
+  private final ResultKind resultKind;
+  private final List<ColumnInfo> columns;
+  private final FetchIterator<Row> data;
+
+  // null in batch mode
+  //
+  // list of boolean in streaming mode,
+  // true if the corresponding row is an append row, false if its a retract row
+  private final List<Boolean> changeFlags;
+
+  private ResultSet(
+      ResultKind resultKind,
+      List<ColumnInfo> columns,
+      FetchIterator<Row> data,
+      @Nullable List<Boolean> changeFlags) {
+    this.resultKind = Preconditions.checkNotNull(resultKind, "resultKind must not be null");
+    this.columns = Preconditions.checkNotNull(columns, "columns must not be null");
+    this.data = Preconditions.checkNotNull(data, "data must not be null");
+    if (!data.hasNext()) {
+      Preconditions.checkArgument(
+          columns.size() == data.next().getArity(),
+          "the size of columns and the number of fields in the row should be equal");
+    }
+    this.changeFlags = changeFlags;
+    if (changeFlags != null) {
+      Preconditions.checkArgument(
+          Iterators.size((Iterator<?>) data) == changeFlags.size(),
+          "the size of data and the size of changeFlags should be equal");
+    }
+  }
+
+  public List<ColumnInfo> getColumns() {
+    return columns;
+  }
+
+  public FetchIterator<Row> getData() {
+    return data;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ResultSet resultSet = (ResultSet) o;
+    return resultKind.equals(resultSet.resultKind)
+        && columns.equals(resultSet.columns)
+        && data.equals(resultSet.data)
+        && Objects.equals(changeFlags, resultSet.changeFlags);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(resultKind, columns, data, changeFlags);
+  }
+
+  @Override
+  public String toString() {
+    return "ResultSet{"
+        + "resultKind="
+        + resultKind
+        + ", columns="
+        + columns
+        + ", data="
+        + data
+        + ", changeFlags="
+        + changeFlags
+        + '}';
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  /** Builder for {@link ResultSet}. */
+  public static class Builder {
+    private ResultKind resultKind = null;
+    private List<ColumnInfo> columns = null;
+    private FetchIterator<Row> data = null;
+    private List<Boolean> changeFlags = null;
+
+    private Builder() {}
+
+    /** Set {@link ResultKind}. */
+    public Builder resultKind(ResultKind resultKind) {
+      this.resultKind = resultKind;
+      return this;
+    }
+
+    /** Set {@link ColumnInfo}s. */
+    public Builder columns(ColumnInfo... columns) {
+      this.columns = Arrays.asList(columns);
+      return this;
+    }
+
+    /** Set {@link ColumnInfo}s. */
+    public Builder columns(List<ColumnInfo> columns) {
+      this.columns = columns;
+      return this;
+    }
+
+    /** Set data. */
+    public Builder data(FetchIterator<Row> data) {
+      this.data = data;
+      return this;
+    }
+
+    /** Set data. */
+    public Builder data(Row[] data) {
+      this.data = new ArrayFetchIterator<>(data);
+      return this;
+    }
+
+    /** Set change flags. */
+    public Builder changeFlags(List<Boolean> changeFlags) {
+      this.changeFlags = changeFlags;
+      return this;
+    }
+
+    /** Returns a {@link ResultSet} instance. */
+    public ResultSet build() {
+      return new ResultSet(resultKind, columns, data, changeFlags);
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
new file mode 100644
index 0000000..d0bfd7a
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.io.IOException
+
+import scala.collection.JavaConverters.collectionAsScalaIterableConverter
+
+import org.apache.hive.service.rpc.thrift.{TRowSet, TTableSchema}
+
+import org.apache.kyuubi.{KyuubiSQLException, Utils}
+import org.apache.kyuubi.engine.flink.context.SessionContext
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.RowSet
+import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
+import org.apache.kyuubi.operation.{AbstractOperation, OperationState}
+import org.apache.kyuubi.operation.FetchOrientation.{FETCH_FIRST, FETCH_NEXT, FETCH_PRIOR, FetchOrientation}
+import org.apache.kyuubi.operation.OperationState.OperationState
+import org.apache.kyuubi.operation.OperationType.OperationType
+import org.apache.kyuubi.operation.log.OperationLog
+import org.apache.kyuubi.session.Session
+
+abstract class FlinkOperation(
+    opType: OperationType,
+    session: Session)
+  extends AbstractOperation(opType, session) {
+
+  protected val sessionContext: SessionContext =
+    session.asInstanceOf[FlinkSessionImpl].getSessionContext
+  protected var resultSet: ResultSet = _
+
+  override protected def beforeRun(): Unit = {
+    setHasResultSet(true)
+    setState(OperationState.RUNNING)
+  }
+
+  override protected def afterRun(): Unit = {
+    state.synchronized {
+      if (!isTerminalState(state)) {
+        setState(OperationState.FINISHED)
+      }
+    }
+    OperationLog.removeCurrentOperationLog()
+  }
+
+  override def cancel(): Unit = {
+    cleanup(OperationState.CANCELED)
+  }
+
+  override def close(): Unit = {
+    cleanup(OperationState.CLOSED)
+    try {
+      getOperationLog.foreach(_.close())
+    } catch {
+      case e: IOException =>
+        error(e.getMessage, e)
+    }
+  }
+
+  override def getResultSetSchema: TTableSchema = {
+    val tTableSchema = new TTableSchema()
+    resultSet.getColumns.asScala.zipWithIndex.foreach { case (f, i) =>
+      tTableSchema.addToColumns(RowSet.toTColumnDesc(f, i))
+    }
+    tTableSchema
+  }
+
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+    validateDefaultFetchOrientation(order)
+    assertState(OperationState.FINISHED)
+    setHasResultSet(true)
+    order match {
+      case FETCH_NEXT => resultSet.getData.fetchNext()
+      case FETCH_PRIOR => resultSet.getData.fetchPrior(rowSetSize);
+      case FETCH_FIRST => resultSet.getData.fetchAbsolute(0);
+    }
+    val token = resultSet.getData.take(rowSetSize)
+    val resultRowSet = RowSet.resultSetToTRowSet(
+      token.toList,
+      resultSet,
+      getProtocolVersion)
+    resultRowSet.setStartRowOffset(resultSet.getData.getPosition)
+    resultRowSet
+  }
+
+  override def shouldRunAsync: Boolean = false
+
+  protected def cleanup(targetState: OperationState): Unit = state.synchronized {
+    if (!isTerminalState(state)) {
+      setState(targetState)
+      if (shouldRunAsync) {
+        Option(getBackgroundHandle).foreach(_.cancel(true))
+      }
+    }
+  }
+
+  protected def onError(cancel: Boolean = false): PartialFunction[Throwable, Unit] = {
+    // We should use Throwable instead of Exception since `java.lang.NoClassDefFoundError`
+    // could be thrown.
+    case e: Throwable =>
+      state.synchronized {
+        val errMsg = Utils.stringifyException(e)
+        if (state == OperationState.TIMEOUT) {
+          val ke = KyuubiSQLException(s"Timeout operating $opType: $errMsg")
+          setOperationException(ke)
+          throw ke
+        } else if (isTerminalState(state)) {
+          setOperationException(KyuubiSQLException(errMsg))
+          warn(s"Ignore exception in terminal state with $statementId: $errMsg")
+        } else {
+          setState(OperationState.ERROR)
+          error(s"Error operating $opType: $errMsg", e)
+          val ke = KyuubiSQLException(s"Error operating $opType: $errMsg", e)
+          setOperationException(ke)
+          throw ke
+        }
+      }
+  }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
new file mode 100644
index 0000000..df83949
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util
+
+import org.apache.kyuubi.operation.{Operation, OperationManager}
+import org.apache.kyuubi.session.Session
+
+class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManager") {
+
+  override def newExecuteStatementOperation(
+      session: Session,
+      statement: String,
+      runAsync: Boolean,
+      queryTimeout: Long): Operation = null
+
+  override def newGetTypeInfoOperation(session: Session): Operation = null
+
+  override def newGetCatalogsOperation(session: Session): Operation = {
+    val op = new GetCatalogs(session)
+    addOperation(op)
+  }
+
+  override def newGetSchemasOperation(
+      session: Session,
+      catalog: String,
+      schema: String): Operation = null
+
+  override def newGetTablesOperation(
+      session: Session,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      tableTypes: util.List[String]): Operation = null
+
+  override def newGetTableTypesOperation(session: Session): Operation = null
+
+  override def newGetColumnsOperation(
+      session: Session,
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      columnName: String): Operation = null
+
+  override def newGetFunctionsOperation(
+      session: Session,
+      catalogName: String,
+      schemaName: String,
+      functionName: String): Operation = null
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
new file mode 100644
index 0000000..3a7a974
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.kyuubi.engine.flink.result.{Constants, OperationUtil}
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+class GetCatalogs(session: Session)
+  extends FlinkOperation(OperationType.GET_CATALOGS, session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      val catalogs: java.util.List[String] =
+        tableEnv.listCatalogs.toList.asJava
+      resultSet = OperationUtil.stringListToResultSet(
+        catalogs,
+        Constants.SHOW_CATALOGS_RESULT)
+    } catch onError()
+  }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
new file mode 100644
index 0000000..069ade7
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/RowSet.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.schema
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter, seqAsJavaListConverter}
+import scala.language.implicitConversions
+
+import org.apache.flink.table.types.logical._
+import org.apache.flink.types.Row
+import org.apache.hive.service.rpc.thrift._
+
+import org.apache.kyuubi.engine.flink.result.{ColumnInfo, ResultSet}
+
+object RowSet {
+
+  def resultSetToTRowSet(
+      rows: Seq[Row],
+      resultSet: ResultSet,
+      protocolVersion: TProtocolVersion): TRowSet = {
+    if (protocolVersion.getValue < TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+      toRowBaseSet(rows, resultSet)
+    } else {
+      toColumnBasedSet(rows, resultSet)
+    }
+  }
+
+  def toRowBaseSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+    val tRows = rows.map { row =>
+      val tRow = new TRow()
+      (0 until row.getArity).map(i => toTColumnValue(i, row, resultSet))
+        .foreach(tRow.addToColVals)
+      tRow
+    }.asJava
+
+    new TRowSet(0, tRows)
+  }
+
+  def toColumnBasedSet(rows: Seq[Row], resultSet: ResultSet): TRowSet = {
+    val size = rows.length
+    val tRowSet = new TRowSet(0, new util.ArrayList[TRow](size))
+    resultSet.getColumns.asScala.zipWithIndex.foreach { case (filed, i) =>
+      val tColumn = toTColumn(rows, i, filed.getLogicalType)
+      tRowSet.addToColumns(tColumn)
+    }
+    tRowSet
+  }
+
+  private def toTColumnValue(
+      ordinal: Int,
+      row: Row,
+      resultSet: ResultSet): TColumnValue = {
+
+    val logicalType = resultSet.getColumns.get(ordinal).getLogicalType
+
+    if (logicalType.isInstanceOf[BooleanType]) {
+      val boolValue = new TBoolValue
+      if (row.getField(ordinal) != null) {
+        boolValue.setValue(row.getField(ordinal).asInstanceOf[Boolean])
+      }
+      TColumnValue.boolVal(boolValue)
+    } else if (logicalType.isInstanceOf[TinyIntType]) {
+      val tI16Value = new TI16Value
+      if (row.getField(ordinal) != null) {
+        tI16Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+      }
+      TColumnValue.i16Val(tI16Value)
+    } else if (logicalType.isInstanceOf[IntType]) {
+      val tI32Value = new TI32Value
+      if (row.getField(ordinal) != null) {
+        tI32Value.setValue(row.getField(ordinal).asInstanceOf[Short])
+      }
+      TColumnValue.i32Val(tI32Value)
+    } else if (logicalType.isInstanceOf[BigIntType]) {
+      val tI64Value = new TI64Value
+      if (row.getField(ordinal) != null) {
+        tI64Value.setValue(row.getField(ordinal).asInstanceOf[Long])
+      }
+      TColumnValue.i64Val(tI64Value)
+    } else if (logicalType.isInstanceOf[FloatType]) {
+      val tDoubleValue = new TDoubleValue
+      if (row.getField(ordinal) != null) {
+        tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Float])
+      }
+      TColumnValue.doubleVal(tDoubleValue)
+    } else if (logicalType.isInstanceOf[DoubleType]) {
+      val tDoubleValue = new TDoubleValue
+      if (row.getField(ordinal) != null) {
+        tDoubleValue.setValue(row.getField(ordinal).asInstanceOf[Double])
+      }
+      TColumnValue.doubleVal(tDoubleValue)
+    } else if (logicalType.isInstanceOf[VarCharType]) {
+      val tStringValue = new TStringValue
+      if (row.getField(ordinal) != null) {
+        tStringValue.setValue(row.getField(ordinal).asInstanceOf[String])
+      }
+      TColumnValue.stringVal(tStringValue)
+    } else {
+      val tStrValue = new TStringValue
+      if (row.getField(ordinal) != null) {
+        // TODO to be done
+      }
+      TColumnValue.stringVal(tStrValue)
+    }
+  }
+
+  implicit private def bitSetToBuffer(bitSet: java.util.BitSet): ByteBuffer = {
+    ByteBuffer.wrap(bitSet.toByteArray)
+  }
+
+  private def toTColumn(
+      rows: Seq[Row],
+      ordinal: Int,
+      logicalType: LogicalType): TColumn = {
+    val nulls = new java.util.BitSet()
+    if (logicalType.isInstanceOf[BooleanType]) {
+      val values = getOrSetAsNull[java.lang.Boolean](
+        rows,
+        ordinal,
+        nulls,
+        true)
+      TColumn.boolVal(new TBoolColumn(values, nulls))
+    } else if (logicalType.isInstanceOf[TinyIntType]) {
+      val values = getOrSetAsNull[java.lang.Short](
+        rows,
+        ordinal,
+        nulls,
+        0.toShort)
+      TColumn.i16Val(new TI16Column(values, nulls))
+    } else if (logicalType.isInstanceOf[VarCharType]) {
+      val values = getOrSetAsNull[java.lang.String](
+        rows,
+        ordinal,
+        nulls,
+        "")
+      TColumn.stringVal(new TStringColumn(values, nulls))
+    } else if (logicalType.isInstanceOf[CharType]) {
+      val values = getOrSetAsNull[java.lang.String](
+        rows,
+        ordinal,
+        nulls,
+        "")
+      TColumn.stringVal(new TStringColumn(values, nulls))
+    } else {
+      null
+    }
+  }
+
+  private def getOrSetAsNull[T](
+      rows: Seq[Row],
+      ordinal: Int,
+      nulls: java.util.BitSet,
+      defaultVal: T): java.util.List[T] = {
+    val size = rows.length
+    val ret = new java.util.ArrayList[T](size)
+    var idx = 0
+    while (idx < size) {
+      val row = rows(idx)
+      val isNull = row.getField(ordinal) == null
+      if (isNull) {
+        nulls.set(idx, true)
+        ret.add(idx, defaultVal)
+      } else {
+        ret.add(idx, row.getField(ordinal).asInstanceOf[T])
+      }
+      idx += 1
+    }
+    ret
+  }
+
+  def toTColumnDesc(field: ColumnInfo, pos: Int): TColumnDesc = {
+    val tColumnDesc = new TColumnDesc()
+    tColumnDesc.setColumnName(field.getName)
+    tColumnDesc.setTypeDesc(toTTypeDesc(field.getLogicalType))
+    tColumnDesc.setComment("")
+    tColumnDesc.setPosition(pos)
+    tColumnDesc
+  }
+
+  def toTTypeDesc(typ: LogicalType): TTypeDesc = {
+    val typeEntry = new TPrimitiveTypeEntry(toTTypeId(typ))
+    typeEntry.setTypeQualifiers(toTTypeQualifiers(typ))
+    val tTypeDesc = new TTypeDesc()
+    tTypeDesc.addToTypes(TTypeEntry.primitiveEntry(typeEntry))
+    tTypeDesc
+  }
+
+  def toTTypeQualifiers(typ: LogicalType): TTypeQualifiers = {
+    val ret = new TTypeQualifiers()
+    val qualifiers = typ match {
+      case d: DecimalType =>
+        Map(
+          TCLIServiceConstants.PRECISION -> TTypeQualifierValue.i32Value(d.getPrecision),
+          TCLIServiceConstants.SCALE -> TTypeQualifierValue.i32Value(d.getScale)).asJava
+      case _ => Collections.emptyMap[String, TTypeQualifierValue]()
+    }
+    ret.setQualifiers(qualifiers)
+    ret
+  }
+
+  def toTTypeId(typ: LogicalType): TTypeId =
+    if (typ.isInstanceOf[NullType]) {
+      TTypeId.NULL_TYPE
+    } else if (typ.isInstanceOf[BooleanType]) {
+      TTypeId.BOOLEAN_TYPE
+    } else if (typ.isInstanceOf[FloatType]) {
+      TTypeId.FLOAT_TYPE
+    } else if (typ.isInstanceOf[DoubleType]) {
+      TTypeId.DOUBLE_TYPE
+    } else if (typ.isInstanceOf[VarCharType]) {
+      TTypeId.STRING_TYPE
+    } else if (typ.isInstanceOf[CharType]) {
+      TTypeId.STRING_TYPE
+    } else {
+      null
+    }
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
new file mode 100644
index 0000000..305ef10
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.session
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.engine.flink.context.EngineContext
+import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
+import org.apache.kyuubi.session.{SessionHandle, SessionManager}
+
+class FlinkSQLSessionManager(engineContext: EngineContext)
+  extends SessionManager("FlinkSQLSessionManager") {
+
+  override protected def isServer: Boolean = false
+
+  val operationManager = new FlinkSQLOperationManager()
+
+  override def openSession(
+      protocol: TProtocolVersion,
+      user: String,
+      password: String,
+      ipAddress: String,
+      conf: Map[String, String]): SessionHandle = null
+
+  override def closeSession(sessionHandle: SessionHandle): Unit = {}
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
new file mode 100644
index 0000000..eda799f
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.session
+
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.engine.flink.context.SessionContext
+import org.apache.kyuubi.session.{AbstractSession, SessionHandle, SessionManager}
+
+class FlinkSessionImpl(
+    protocol: TProtocolVersion,
+    user: String,
+    password: String,
+    ipAddress: String,
+    conf: Map[String, String],
+    sessionManager: SessionManager,
+    sessionContext: SessionContext)
+  extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
+
+  override val handle: SessionHandle = SessionHandle(protocol)
+
+  def getSessionContext: SessionContext = sessionContext
+
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
new file mode 100644
index 0000000..d95c93e
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import java.util.Collections
+
+import scala.collection.JavaConverters.mapAsJavaMapConverter
+
+import org.apache.flink.client.cli.DefaultCLI
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader
+import org.apache.flink.configuration.Configuration
+import org.apache.hive.service.rpc.thrift.TProtocolVersion
+
+import org.apache.kyuubi.{KyuubiFunSuite, Utils}
+import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.engine.flink.config.EngineEnvironment
+import org.apache.kyuubi.engine.flink.config.entries.ExecutionEntry
+import org.apache.kyuubi.engine.flink.context.{EngineContext, SessionContext}
+import org.apache.kyuubi.engine.flink.session.{FlinkSessionImpl, FlinkSQLSessionManager}
+import org.apache.kyuubi.operation.FetchOrientation
+
+class FlinkOperationSuite extends KyuubiFunSuite {
+
+  val user: String = Utils.currentUser
+  val password = "anonymous"
+
+  var engineEnv = new EngineEnvironment
+  var engineContext = new EngineContext(
+    engineEnv,
+    Collections.emptyList(),
+    new Configuration,
+    new DefaultCLI,
+    new DefaultClusterClientServiceLoader)
+  var sessionContext: SessionContext = _
+  var flinkSession: FlinkSessionImpl = _
+
+  override def beforeAll(): Unit = {
+    engineEnv = EngineEnvironment.enrich(
+      engineContext.getEngineEnv,
+      Map(EngineEnvironment.EXECUTION_ENTRY + "." + ExecutionEntry.EXECUTION_TYPE
+        -> ExecutionEntry.EXECUTION_TYPE_VALUE_BATCH).asJava)
+    sessionContext = new SessionContext(engineEnv, engineContext)
+    val flinkSQLSessionManager = new FlinkSQLSessionManager(engineContext)
+    flinkSQLSessionManager.initialize(KyuubiConf())
+    flinkSession = new FlinkSessionImpl(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1,
+      user,
+      password,
+      "localhost",
+      Map(),
+      flinkSQLSessionManager,
+      sessionContext)
+    super.beforeAll()
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+  }
+
+  test("get catalogs for flink sql") {
+    val getCatalogOperation = new GetCatalogs(flinkSession)
+    getCatalogOperation.run()
+
+    val resultSet = getCatalogOperation.getNextRowSet(FetchOrientation.FETCH_FIRST, 10)
+    assert(1 == resultSet.getRowsSize)
+    assert(resultSet.getRows.get(0).getColVals().get(0).getStringVal.getValue === "default_catalog")
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index 86ea42b..f0b36ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -988,6 +988,80 @@
                 <artifactId>swagger-ui</artifactId>
                 <version>${swagger-ui.version}</version>
             </dependency>
+
+            <!-- flink -->
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-core</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-clients_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-common</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-api-java</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+                <scope>provided</scope>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-sql-parser</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-yarn_${scala.binary.version}</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
         </dependencies>
     </dependencyManagement>