You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/03/19 00:42:02 UTC

[zeppelin] branch master updated: [ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d4561b  [ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)
9d4561b is described below

commit 9d4561ba331bba6d8959775691796621faaf72bd
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Mar 19 08:41:52 2022 +0800

    [ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)
    
    * [ZEPPELIN-5659] Refactor FlinkSqlInterpreter
    
    * minor update
    
    * revert changes in zeppelin-server/pom.xml
---
 flink/flink-scala-parent/pom.xml                   |   4 +-
 .../zeppelin/flink/FlinkBatchSqlInterpreter.java   |  44 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |   6 +-
 .../apache/zeppelin/flink/FlinkSqlInterpreter.java | 497 +----------------
 .../zeppelin/flink/FlinkStreamSqlInterpreter.java  |  61 ++-
 .../flink/FlinkBatchSqlInterpreterTest.java        | 119 ++---
 .../zeppelin/flink/FlinkInterpreterTest.java       |  55 +-
 ...reterTest.java => FlinkSqlInterpreterTest.java} |  86 +--
 .../flink/FlinkStreamSqlInterpreterTest.java       | 145 +++--
 .../src/test/resources/init_stream.scala           |   9 +-
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  44 +-
 .../org/apache/zeppelin/flink/FlinkSqlContext.java |  73 +++
 .../zeppelin/flink/sql/SqlCommandParser.java       | 250 ---------
 .../org/apache/zeppelin/flink/Flink112Shims.java   | 306 +----------
 .../zeppelin/flink/Flink112SqlInterpreter.java     | 584 ++++++++++++++++++++
 .../zeppelin/flink/shims112/SqlCommandParser.java  | 355 +++++++++++++
 .../org/apache/zeppelin/flink/Flink113Shims.java   | 301 +----------
 .../zeppelin/flink/Flink113SqlInterpreter.java     | 559 ++++++++++++++++++++
 flink/flink1.14-shims/pom.xml                      |   7 +
 .../org/apache/zeppelin/flink/Flink114Shims.java   | 300 +----------
 .../zeppelin/flink/Flink114SqlInterpreter.java     | 587 +++++++++++++++++++++
 21 files changed, 2538 insertions(+), 1854 deletions(-)

diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index 83a7ec7..efebced 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -560,7 +560,7 @@
     <dependency>
       <groupId>net.jodah</groupId>
       <artifactId>concurrentunit</artifactId>
-      <version>0.4.4</version>
+      <version>0.4.6</version>
       <scope>test</scope>
     </dependency>
 
@@ -791,7 +791,7 @@
           <!-- set sun.zip.disableMemoryMapping=true because of
           https://blogs.oracle.com/poonam/crashes-in-zipgetentry
           https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
-          <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+          <argLine>-Xmx5120m -XX:MaxMetaspaceSize=1024m -Dsun.zip.disableMemoryMapping=true</argLine>
 <!--          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
 
           <environmentVariables>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index d10a9ea..dea35bf 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -17,58 +17,36 @@
 
 package org.apache.zeppelin.flink;
 
-import org.apache.flink.table.api.Table;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 
-import java.io.IOException;
 import java.util.Properties;
 
 public class FlinkBatchSqlInterpreter extends FlinkSqlInterpreter {
 
-  private ZeppelinContext z;
-
   public FlinkBatchSqlInterpreter(Properties properties) {
     super(properties);
   }
 
   @Override
-  protected boolean isBatch() {
-    return true;
-  }
-
-  @Override
   public void open() throws InterpreterException {
-    this.flinkInterpreter =
-            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
-    this.tbenv = flinkInterpreter.getJavaBatchTableEnvironment("blink");
-    this.z = flinkInterpreter.getZeppelinContext();
     super.open();
+    FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
+            flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
+            flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
+            flinkInterpreter.getJavaBatchTableEnvironment("blink"),
+            flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+            flinkInterpreter.getZeppelinContext(),
+            null);
+    flinkInterpreter.getFlinkShims().initInnerBatchSqlInterpreter(flinkSqlContext);
   }
 
   @Override
-  public void close() throws InterpreterException {
-
-  }
-
-  @Override
-  public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
-    Table table = this.tbenv.sqlQuery(sql);
-    String result = z.showData(table);
-    context.out.write(result);
-  }
-
-  @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    flinkInterpreter.cancel(context);
-  }
-
-  @Override
-  public FormType getFormType() throws InterpreterException {
-    return FormType.SIMPLE;
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    return flinkShims.runSqlList(st, context, true);
   }
 
   @Override
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d629ba4..7e047cc 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -22,11 +22,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
 import org.apache.flink.table.api.TableEnvironment;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
index c3ec4cd..8c55765 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
@@ -18,81 +18,30 @@
 
 package org.apache.zeppelin.flink;
 
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.JobListener;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.interpreter.AbstractInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
-import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.apache.zeppelin.interpreter.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreter.class);
 
   protected FlinkInterpreter flinkInterpreter;
-  protected TableEnvironment tbenv;
-  private SqlCommandParser sqlCommandParser;
-  private SqlSplitter sqlSplitter;
-  private int defaultSqlParallelism;
-  private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
-  // all the available sql config options. see
-  // https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/
-  private Map<String, ConfigOption> tableConfigOptions;
+  protected FlinkShims flinkShims;
+  protected ZeppelinContext z;
+
 
   public FlinkSqlInterpreter(Properties properties) {
     super(properties);
   }
 
-  protected abstract boolean isBatch();
-
   @Override
   public void open() throws InterpreterException {
-    this.sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), tbenv);
-    this.sqlSplitter = new SqlSplitter();
-    JobListener jobListener = new JobListener() {
-      @Override
-      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
-        if (lock.isHeldByCurrentThread()) {
-          lock.unlock();
-          LOGGER.info("UnLock JobSubmitLock");
-        }
-      }
-
-      @Override
-      public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
-
-      }
-    };
-
-    flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
-    flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
-    this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
-    this.tableConfigOptions = flinkInterpreter.getFlinkShims().extractTableConfigOptions();
+    this.flinkInterpreter =
+            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+    this.flinkShims = flinkInterpreter.getFlinkShims();
   }
 
   @Override
@@ -121,435 +70,19 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
     }
   }
 
-  private InterpreterResult runSqlList(String st, InterpreterContext context) {
-
-    try {
-      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
-      boolean isFirstInsert = true;
-      boolean hasInsert = false;
-      for (String sql : sqls) {
-        Optional<SqlCommandParser.SqlCommandCall> sqlCommand = sqlCommandParser.parse(sql);
-        if (!sqlCommand.isPresent()) {
-          try {
-            context.out.write("%text Invalid Sql statement: " + sql + "\n");
-            context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
-          } catch (IOException e) {
-            return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
-          }
-          return new InterpreterResult(InterpreterResult.Code.ERROR);
-        }
-        try {
-          if (sqlCommand.get().command == SqlCommand.INSERT_INTO ||
-                  sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) {
-            hasInsert = true;
-            if (isFirstInsert && runAsOne) {
-              flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv, context);
-              isFirstInsert = false;
-            }
-          }
-          callCommand(sqlCommand.get(), context);
-          context.out.flush();
-        } catch (Throwable e) {
-          LOGGER.error("Fail to run sql:" + sql, e);
-          try {
-            context.out.write("%text Fail to run sql command: " +
-                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
-          } catch (IOException ex) {
-            LOGGER.warn("Unexpected exception:", ex);
-            return new InterpreterResult(InterpreterResult.Code.ERROR,
-                    ExceptionUtils.getStackTrace(e));
-          }
-          return new InterpreterResult(InterpreterResult.Code.ERROR);
-        }
-      }
-
-      if (runAsOne && hasInsert) {
-        try {
-          lock.lock();
-          String jobName = context.getStringLocalProperty("jobName", st);
-          if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) {
-            context.out.write("Insertion successfully.\n");
-          }
-        } catch (Exception e) {
-          LOGGER.error("Fail to execute sql as one job", e);
-          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
-        } finally {
-          if (lock.isHeldByCurrentThread()) {
-            lock.unlock();
-          }
-        }
-      }
-    } catch(Exception e) {
-      LOGGER.error("Fail to execute sql", e);
-      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
-    } finally {
-      // reset parallelism
-      this.tbenv.getConfig().getConfiguration()
-              .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
-                      defaultSqlParallelism);
-      // reset table config
-      for (ConfigOption configOption: tableConfigOptions.values()) {
-        // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
-        if (configOption.defaultValue() != null) {
-          this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
-        }
-      }
-      this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
-    }
-
-    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
-  }
-
-  private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
-                                        InterpreterContext context) throws Exception {
-    switch (cmdCall.command) {
-      case HELP:
-        callHelp(context);
-        break;
-      case SHOW_CATALOGS:
-        callShowCatalogs(context);
-        break;
-      case SHOW_DATABASES:
-        callShowDatabases(context);
-        break;
-      case SHOW_TABLES:
-        callShowTables(context);
-        break;
-      case SOURCE:
-        callSource(cmdCall.operands[0], context);
-        break;
-      case CREATE_FUNCTION:
-        callCreateFunction(cmdCall.operands[0], context);
-        break;
-      case DROP_FUNCTION:
-        callDropFunction(cmdCall.operands[0], context);
-        break;
-      case ALTER_FUNCTION:
-        callAlterFunction(cmdCall.operands[0], context);
-        break;
-      case SHOW_FUNCTIONS:
-        callShowFunctions(context);
-        break;
-      case SHOW_MODULES:
-        callShowModules(context);
-        break;
-      case USE_CATALOG:
-        callUseCatalog(cmdCall.operands[0], context);
-        break;
-      case USE:
-        callUseDatabase(cmdCall.operands[0], context);
-        break;
-      case CREATE_CATALOG:
-        callCreateCatalog(cmdCall.operands[0], context);
-        break;
-      case DROP_CATALOG:
-        callDropCatalog(cmdCall.operands[0], context);
-        break;
-      case DESC:
-      case DESCRIBE:
-        callDescribe(cmdCall.operands[0], context);
-        break;
-      case EXPLAIN:
-        callExplain(cmdCall.operands[0], context);
-        break;
-      case SELECT:
-        callSelect(cmdCall.operands[0], context);
-        break;
-      case SET:
-        callSet(cmdCall.operands[0], cmdCall.operands[1], context);
-        break;
-      case INSERT_INTO:
-      case INSERT_OVERWRITE:
-        callInsertInto(cmdCall.operands[0], context);
-        break;
-      case CREATE_TABLE:
-        callCreateTable(cmdCall.operands[0], context);
-        break;
-      case DROP_TABLE:
-        callDropTable(cmdCall.operands[0], context);
-        break;
-      case CREATE_VIEW:
-        callCreateView(cmdCall, context);
-        break;
-      case DROP_VIEW:
-        callDropView(cmdCall, context);
-        break;
-      case CREATE_DATABASE:
-        callCreateDatabase(cmdCall.operands[0], context);
-        break;
-      case DROP_DATABASE:
-        callDropDatabase(cmdCall.operands[0], context);
-        break;
-      case ALTER_DATABASE:
-        callAlterDatabase(cmdCall.operands[0], context);
-        break;
-      case ALTER_TABLE:
-        callAlterTable(cmdCall.operands[0], context);
-        break;
-      default:
-        throw new Exception("Unsupported command: " + cmdCall.command);
-    }
-  }
-
-  private void callAlterTable(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Table has been modified.\n");
-  }
-
-  private void callAlterDatabase(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Database has been modified.\n");
-  }
-
-  private void callDropDatabase(String sql, InterpreterContext context) throws IOException {
-    try {
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Database has been dropped.\n");
-  }
-
-  private void callCreateDatabase(String sql, InterpreterContext context) throws IOException {
-    try {
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Database has been created.\n");
-  }
-
-  private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("View has been dropped.\n");
-  }
-
-  private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("View has been created.\n");
-  }
-
-  private void callCreateTable(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Table has been created.\n");
-  }
-
-  private void callDropTable(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      this.tbenv.sqlUpdate(sql);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-    context.out.write("Table has been dropped.\n");
-  }
+  public abstract InterpreterResult runSqlList(String st, InterpreterContext context);
 
-  private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
-    this.tbenv.useCatalog(catalog);
-  }
-
-  private void callCreateCatalog(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
-    context.out.write("Catalog has been created.\n");
-  }
-
-  private void callDropCatalog(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
-    context.out.write("Catalog has been dropped.\n");
-  }
-
-  private void callShowModules(InterpreterContext context) throws IOException {
-    String[] modules = this.tbenv.listModules();
-    context.out.write("%table module\n" + StringUtils.join(modules, "\n") + "\n");
-  }
-
-  private void callHelp(InterpreterContext context) throws IOException {
-    context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
-  }
-
-  private void callShowCatalogs(InterpreterContext context) throws IOException {
-    String[] catalogs = this.tbenv.listCatalogs();
-    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
-  }
-
-  private void callShowDatabases(InterpreterContext context) throws IOException {
-    String[] databases = this.tbenv.listDatabases();
-    context.out.write(
-            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
-  }
-
-  private void callShowTables(InterpreterContext context) throws IOException {
-    List<String> tables =
-            Arrays.asList(this.tbenv.listTables()).stream()
-                    .filter(tbl -> !tbl.startsWith("UnnamedTable")).collect(Collectors.toList());
-    context.out.write(
-            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
-  }
-
-  private void callSource(String sqlFile, InterpreterContext context) throws IOException {
-    String sql = IOUtils.toString(new FileInputStream(sqlFile));
-    runSqlList(sql, context);
-  }
-
-  private void callCreateFunction(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
-    context.out.write("Function has been created.\n");
-  }
-
-  private void callDropFunction(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
-    context.out.write("Function has been dropped.\n");
-  }
-
-  private void callAlterFunction(String sql, InterpreterContext context) throws IOException {
-    flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
-    context.out.write("Function has been modified.\n");
-  }
-
-  private void callShowFunctions(InterpreterContext context) throws IOException {
-    String[] functions = this.tbenv.listUserDefinedFunctions();
-    context.out.write(
-            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
-  }
-
-  private void callUseDatabase(String databaseName,
-                               InterpreterContext context) throws IOException {
-    tbenv.useDatabase(databaseName);
-  }
-
-  private void callDescribe(String name, InterpreterContext context) throws IOException {
-    TableSchema schema = tbenv.scan(name.split("\\.")).getSchema();
-    StringBuilder builder = new StringBuilder();
-    builder.append("Column\tType\n");
-    for (int i = 0; i < schema.getFieldCount(); ++i) {
-      builder.append(schema.getFieldName(i).get() + "\t" + schema.getFieldDataType(i).get() + "\n");
-    }
-    context.out.write("%table\n" + builder.toString());
-  }
-
-  private void callExplain(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      context.out.write(this.flinkInterpreter.getFlinkShims().explain(tbenv, sql) + "\n");
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-  }
-
-  public void callSelect(String sql, InterpreterContext context) throws IOException {
-    try {
-      lock.lock();
-      callInnerSelect(sql, context);
-    } finally {
-      if (lock.isHeldByCurrentThread()) {
-        lock.unlock();
-      }
-    }
-  }
-
-  public abstract void callInnerSelect(String sql, InterpreterContext context) throws IOException;
-
-  private String removeSingleQuote(String value) {
-    value = value.trim();
-    if (value.startsWith("'")) {
-      value = value.substring(1);
-    }
-    if (value.endsWith("'")) {
-      value = value.substring(0, value.length() - 1);
-    }
-    return value;
-  }
-
-  public void callSet(String key, String value, InterpreterContext context) throws Exception {
-    key = removeSingleQuote(key);
-    value = removeSingleQuote(value);
-
-    if ("execution.runtime-mode".equals(key)) {
-      throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, " +
-              "you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode");
-    }
-
-    if (!tableConfigOptions.containsKey(key)) {
-      throw new IOException(key + " is not a valid table/sql config, please check link: " +
-              "https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/");
-    }
-
-    LOGGER.info("Set table config: {}={}", key, value);
-    this.tbenv.getConfig().getConfiguration().setString(key, value);
+  @Override
+  public void cancel(InterpreterContext context) throws InterpreterException {
+    flinkInterpreter.cancel(context);
   }
 
-  public void callInsertInto(String sql,
-                              InterpreterContext context) throws IOException {
-     if (!isBatch()) {
-       context.getLocalProperties().put("flink.streaming.insert_into", "true");
-     }
-     try {
-       lock.lock();
-       boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-       if (!runAsOne) {
-         this.tbenv.sqlUpdate(sql);
-         String jobName = context.getStringLocalProperty("jobName", sql);
-         this.tbenv.execute(jobName);
-         context.out.write("Insertion successfully.\n");
-       } else {
-         flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
-       }
-     } catch (Exception e) {
-       throw new IOException(e);
-     } finally {
-       if (lock.isHeldByCurrentThread()) {
-         lock.unlock();
-       }
-     }
+  @Override
+  public FormType getFormType() throws InterpreterException {
+    return FormType.SIMPLE;
   }
 
   @Override
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    this.flinkInterpreter.cancel(context);
+  public void close() throws InterpreterException {
   }
-
 }
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index 23aadf2..6bffb23 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -21,9 +21,9 @@ package org.apache.zeppelin.flink;
 import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
 import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
 import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
-import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.scheduler.Scheduler;
 import org.apache.zeppelin.scheduler.SchedulerFactory;
 
@@ -37,35 +37,35 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
   }
 
   @Override
-  protected boolean isBatch() {
-    return false;
-  }
-
-  @Override
   public void open() throws InterpreterException {
-    this.flinkInterpreter =
-            getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
-    this.tbenv = flinkInterpreter.getJavaStreamTableEnvironment("blink");
     super.open();
-  }
-
-  @Override
-  public void close() throws InterpreterException {
+    FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
+            flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
+            flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
+            flinkInterpreter.getJavaBatchTableEnvironment("blink"),
+            flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+            flinkInterpreter.getZeppelinContext(),
+            sql -> callInnerSelect(sql));
 
+    flinkInterpreter.getFlinkShims().initInnerStreamSqlInterpreter(flinkSqlContext);
   }
 
-  @Override
-  public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
+  public void callInnerSelect(String sql) {
+    InterpreterContext context = InterpreterContext.get();
     String streamType = context.getLocalProperties().getOrDefault("type", "update");
     if (streamType.equalsIgnoreCase("single")) {
       SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
               flinkInterpreter.getStreamExecutionEnvironment(),
-              tbenv,
+              flinkInterpreter.getJavaStreamTableEnvironment("blink"),
               flinkInterpreter.getJobManager(),
               context,
               flinkInterpreter.getDefaultParallelism(),
               flinkInterpreter.getFlinkShims());
-      streamJob.run(sql);
+      try {
+        streamJob.run(sql);
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to run single type stream job", e);
+      }
     } else if (streamType.equalsIgnoreCase("append")) {
       AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
               flinkInterpreter.getStreamExecutionEnvironment(),
@@ -74,7 +74,11 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
               context,
               flinkInterpreter.getDefaultParallelism(),
               flinkInterpreter.getFlinkShims());
-      streamJob.run(sql);
+      try {
+        streamJob.run(sql);
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to run append type stream job", e);
+      }
     } else if (streamType.equalsIgnoreCase("update")) {
       UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
               flinkInterpreter.getStreamExecutionEnvironment(),
@@ -83,24 +87,19 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
               context,
               flinkInterpreter.getDefaultParallelism(),
               flinkInterpreter.getFlinkShims());
-      streamJob.run(sql);
+      try {
+        streamJob.run(sql);
+      } catch (IOException e) {
+        throw new RuntimeException("Fail to run update type stream job", e);
+      }
     } else {
-      throw new IOException("Unrecognized stream type: " + streamType);
+      throw new RuntimeException("Unrecognized stream type: " + streamType);
     }
   }
 
   @Override
-  public void callInsertInto(String sql, InterpreterContext context) throws IOException {
-    super.callInsertInto(sql, context);
-  }
-
-  public void cancel(InterpreterContext context) throws InterpreterException {
-    this.flinkInterpreter.cancel(context);
-  }
-
-  @Override
-  public Interpreter.FormType getFormType() throws InterpreterException {
-    return Interpreter.FormType.SIMPLE;
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    return flinkShims.runSqlList(st, context, false);
   }
 
   @Override
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index e3526eb..41b3014 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -37,7 +37,7 @@ import java.util.Properties;
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 
-public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
+public class FlinkBatchSqlInterpreterTest extends FlinkSqlInterpreterTest {
 
   @Override
   protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
@@ -201,68 +201,46 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     context = getInterpreterContext();
     result = sqlInterpreter.interpret(
             "insert into sink_table select * from source_table", context);
-    assertEquals(InterpreterResult.Code.ERROR, result.code());
+    assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
     assertTrue(resultMessages.get(0).getData(),
             resultMessages.get(0).getData().contains("already exists"));
 
-    // insert overwrite into
-    //    context = getInterpreterContext();
-    //    result = sqlInterpreter.interpret(
-    //            "insert overwrite dest_table select id + 1, name from source_table", context);
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //    resultMessages = context.out.toInterpreterResultMessage();
-    //    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
-    //
-    //    // verify insert into via select from the dest_table
-    //    context = getInterpreterContext();
-    //    result = sqlInterpreter.interpret(
-    //            "select * from dest_table", context);
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //    resultMessages = context.out.toInterpreterResultMessage();
-    //    assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
-    //
-    //    // define scala udf
-    //    result = flinkInterpreter.interpret(
-    //            "class AddOne extends ScalarFunction {\n" +
-    //                    "  def eval(a: Int): Int = a + 1\n" +
-    //                    "}", getInterpreterContext());
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //
-    //    result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
-    //            getInterpreterContext());
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //
-    //    // insert into dest_table2 using udf
-    //    destDir = Files.createTempDirectory("flink_test").toFile();
-    //    FileUtils.deleteDirectory(destDir);
-    //    result = sqlInterpreter.interpret(
-    //            "CREATE TABLE dest_table2 (\n" +
-    //                    "id int,\n" +
-    //                    "name string" +
-    //                    ") WITH (\n" +
-    //                    "'format.field-delimiter'=',',\n" +
-    //                    "'connector.type'='filesystem',\n" +
-    //                    "'format.derive-schema'='true',\n" +
-    //                    "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
-    //                    "'format.type'='csv'\n" +
-    //                    ");", getInterpreterContext());
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //
-    //    context = getInterpreterContext();
-    //    result = sqlInterpreter.interpret(
-    //            "insert into dest_table2 select addOne(id), name from source_table", context);
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //    resultMessages = context.out.toInterpreterResultMessage();
-    //    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
-    //
-    //    // verify insert into via select from the dest table
-    //    context = getInterpreterContext();
-    //    result = sqlInterpreter.interpret(
-    //            "select * from dest_table2", context);
-    //    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    //    resultMessages = context.out.toInterpreterResultMessage();
-    //    assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
+    // insert into again will succeed after destDir is deleted
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "insert into sink_table select * from source_table", context);
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+
+    // define scala udf
+    result = flinkInterpreter.interpret(
+            "class AddOne extends ScalarFunction {\n" +
+                    "  def eval(a: Int): Int = a + 1\n" +
+                    "}", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // insert into sink_table again using udf
+    destDir.delete();
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "insert into sink_table select addOne(id), name from source_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+
+    // verify insert into via select from the dest table
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "select * from sink_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
   }
 
   @Test
@@ -293,8 +271,8 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
-    assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
-            sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+    assertEquals(10,
+            flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM).intValue());
 
     // set then insert into
     destDir.delete();
@@ -305,21 +283,10 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
-    assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
-            sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
-    assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
-            sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));
-
-    // invalid config
-    destDir.delete();
-    context = getInterpreterContext();
-    result = sqlInterpreter.interpret(
-            "set table.invalid_config=false;" +
-                    "insert into sink_table select * from source_table", context);
-    assertEquals(InterpreterResult.Code.ERROR, result.code());
-    resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.get(0).getData(),
-            resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config"));
+    assertEquals(10,
+            flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM).intValue());
+    assertEquals(false,
+            flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED).booleanValue());
   }
 
   @Test
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index c3939db..b029304 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -19,17 +19,11 @@ package org.apache.zeppelin.flink;
 
 import junit.framework.TestCase;
 import net.jodah.concurrentunit.Waiter;
-import org.apache.commons.io.FileUtils;
 import org.apache.zeppelin.display.AngularObjectRegistry;
 import org.apache.zeppelin.display.ui.CheckBox;
 import org.apache.zeppelin.display.ui.Select;
 import org.apache.zeppelin.display.ui.TextBox;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.*;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
 import org.junit.After;
@@ -40,14 +34,13 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 
 
@@ -346,28 +339,34 @@ public class FlinkInterpreterTest {
             resultMessages.get(0).getData().contains("url\tpv\n"));
   }
 
-  // TODO(zjffdu) flaky test
-  // @Test
-  public void testResumeStreamSqlFromSavePoint() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+  @Test
+  public void testResumeStreamSqlFromSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+    if (!interpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+      LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+      // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+      return;
+    }
+
     String initStreamScalaScript = FlinkStreamSqlInterpreterTest.getInitStreamScript(1000);
     InterpreterResult result = interpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    File savePointDir = FileUtils.getTempDirectory();
+    File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
     final Waiter waiter = new Waiter();
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = interpreter.interpret(
                 "val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
                 "log group by url\")\nz.show(table, streamType=\"update\")", context);
-        System.out.println("------------" + context.out.toString());
-        System.out.println("------------" + result2);
+        LOGGER.info("------------" + context.out.toString());
+        LOGGER.info("------------" + result2);
         waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+        // Flink job is succeed when it is cancelled with save point.
         waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
       } catch (Exception e) {
         e.printStackTrace();
@@ -378,16 +377,25 @@ public class FlinkInterpreterTest {
     thread.start();
 
     // the streaming job will run for 60 seconds. check init_stream.scala
-    // sleep 20 seconds to make sure the job is started but not finished
-    Thread.sleep(20 * 1000);
+    // sleep 30 seconds to make sure the job is started but not finished
+    Thread.sleep(30 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     interpreter.cancel(context);
-    waiter.await(20 * 1000);
+    waiter.await(30 * 1000);
+
+    // verify save point is generated
+    String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
+    TestCase.assertTrue(allSavepointPath.length > 0);
+
     // resume job from savepoint
+    context = getInterpreterContext();
+    context.getLocalProperties().put(JobManager.SAVEPOINT_PATH, allSavepointPath[0]);
+    context.getLocalProperties().put("parallelism", "2");
+    context.getLocalProperties().put("maxParallelism", "10");
     interpreter.interpret(
             "val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
             "log group by url\")\nz.show(table, streamType=\"update\")", context);
@@ -396,6 +404,11 @@ public class FlinkInterpreterTest {
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     TestCase.assertTrue(resultMessages.toString(),
             resultMessages.get(0).getData().contains("url\tpv\n"));
+    assertEquals(resultMessages.toString(), "url\tpv\n" +
+                    "home\t10\n" +
+                    "product\t30\n" +
+                    "search\t20\n",
+            resultMessages.get(0).getData());
   }
 
   private InterpreterContext getInterpreterContext() {
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
similarity index 91%
rename from flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
index 12862d2..2673ff0 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
@@ -69,9 +69,9 @@ import static org.mockito.Mockito.mock;
 
 
 @RunWith(FlinkStandaloneHiveRunner.class)
-public abstract class SqlInterpreterTest {
+public abstract class FlinkSqlInterpreterTest {
 
-  private static final Logger LOGGER = LoggerFactory.getLogger(SqlInterpreterTest.class);
+  protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreterTest.class);
 
 
   protected FlinkInterpreter flinkInterpreter;
@@ -94,7 +94,7 @@ public abstract class SqlInterpreterTest {
     p.setProperty("zeppelin.pyflink.useIPython", "false");
     p.setProperty("local.number-taskmanager", "4");
     p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
-
+    
     File hiveConfDir = Files.createTempDir();
     hiveShell.getHiveConf().writeXml(new FileWriter(new File(hiveConfDir, "hive-site.xml")));
     p.setProperty("HIVE_CONF_DIR", hiveConfDir.getAbsolutePath());
@@ -176,6 +176,15 @@ public abstract class SqlInterpreterTest {
     result = sqlInterpreter.interpret("use db1", context);
     assertEquals(Code.SUCCESS, result.code());
 
+    // show current database
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show current database", context);
+    assertEquals(Code.SUCCESS, result.code());
+    assertEquals(1, context.out.toInterpreterResultMessage().size());
+    assertEquals(Type.TEXT, context.out.toInterpreterResultMessage().get(0).getType());
+    assertEquals("current database: db1\n", context.out.toInterpreterResultMessage().get(0).getData());
+    
+    // show tables
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("show tables", context);
     assertEquals(Code.SUCCESS, result.code());
@@ -183,10 +192,12 @@ public abstract class SqlInterpreterTest {
     assertEquals(Type.TABLE, resultMessages.get(0).getType());
     assertEquals("table\n", resultMessages.get(0).getData());
 
+    // create table
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("CREATE TABLE source (msg INT) with ('connector'='print')", context);
     assertEquals(Code.SUCCESS, result.code());
 
+    // show tables
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("show tables", context);
     assertEquals(Code.SUCCESS, result.code());
@@ -194,6 +205,7 @@ public abstract class SqlInterpreterTest {
     assertEquals(Type.TABLE, resultMessages.get(0).getType());
     assertEquals("table\nsource\n", resultMessages.get(0).getData());
 
+    // describe table
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("describe db1.source", context);
     assertEquals(Code.SUCCESS, result.code());
@@ -203,14 +215,17 @@ public abstract class SqlInterpreterTest {
                     "msg\tINT\n"
             , resultMessages.get(0).getData());
 
+    // use database
     context = getInterpreterContext();
-    result = sqlInterpreter.interpret("use default", context);
-    assertEquals(Code.SUCCESS, result.code());
+    result = sqlInterpreter.interpret("use `default`", context);
+    assertEquals(context.out.toString(), Code.SUCCESS, result.code());
 
+    // show tables
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("show tables", context);
     assertEquals(Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
     assertEquals(Type.TABLE, resultMessages.get(0).getType());
     assertEquals("table\n", resultMessages.get(0).getData());
 
@@ -257,7 +272,7 @@ public abstract class SqlInterpreterTest {
     // describe table
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("describe source_table", context);
-    assertEquals(Code.SUCCESS, result.code());
+    assertEquals(result.toString(), Code.SUCCESS, result.code());
     assertEquals(1, resultMessages.size());
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(Type.TABLE, resultMessages.get(0).getType());
@@ -275,7 +290,7 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertTrue(resultMessages.toString(),
-            resultMessages.get(0).getData().contains("Table `unknown_table` was not found."));
+            resultMessages.get(0).getData().contains("doesn't exist"));
 
     // drop unknown table
     context = getInterpreterContext();
@@ -301,7 +316,7 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertTrue(resultMessages.get(0).getData(),
-            resultMessages.get(0).getData().contains("Table `source_table` was not found"));
+            resultMessages.get(0).getData().contains("doesn't exist"));
   }
 
   @Test
@@ -414,7 +429,6 @@ public abstract class SqlInterpreterTest {
 
   @Test
   public void testInvalidSql() throws InterpreterException, IOException {
-
     InterpreterContext context = getInterpreterContext();
     InterpreterResult result = sqlInterpreter.interpret("Invalid sql", context);
     assertEquals(Code.ERROR, result.code());
@@ -422,12 +436,9 @@ public abstract class SqlInterpreterTest {
     assertEquals(1, resultMessages.size());
     assertEquals(Type.TEXT, resultMessages.get(0).getType());
     assertTrue(resultMessages.get(0).getData(),
-            resultMessages.get(0).getData().contains("Invalid Sql statement: Invalid sql"));
-    assertTrue(resultMessages.get(0).getData(),
-            resultMessages.get(0).getData().contains("The following commands are available"));
+            resultMessages.get(0).getData().contains("Invalid Sql statement"));
   }
 
-
   @Test
   public void testFunction() throws IOException, InterpreterException {
     InterpreterContext context = getInterpreterContext();
@@ -447,23 +458,20 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf"));
 
-
     // ALTER
     context = getInterpreterContext();
     result = sqlInterpreter.interpret(
             "ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; ", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been modified."));
-
+    assertEquals("Alter function succeeded!\n",resultMessages.get(0).getData());
 
     // DROP UDF
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been dropped."));
-
+    assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been removed."));
 
     // SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf'
     result = sqlInterpreter.interpret(
@@ -485,7 +493,7 @@ public abstract class SqlInterpreterTest {
                     ");", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been created."));
+    assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("Catalog has been created."));
 
     // USE CATALOG & SHOW DATABASES;
     context = getInterpreterContext();
@@ -496,6 +504,13 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default"));
 
+    // SHOW CURRENT CATALOG
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("SHOW CURRENT CATALOG", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals("current catalog: test_catalog\n", resultMessages.get(0).getData());
+
     // DROP CATALOG
     context = getInterpreterContext();
     result = sqlInterpreter.interpret(
@@ -510,37 +525,35 @@ public abstract class SqlInterpreterTest {
             "SHOW CATALOGS ;\n", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     resultMessages = context.out.toInterpreterResultMessage();
-    assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog"));
-    assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog"));
+    assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("default_catalog"));
+    assertFalse(context.out.toString(), resultMessages.get(0).getData().contains("test_catalog"));
 
   }
 
   @Test
   public void testSetProperty() throws InterpreterException {
-    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+    // set time-zone with single quote
     InterpreterContext context = getInterpreterContext();
-    InterpreterResult result = sqlInterpreter.interpret(
-            "set table.sql-dialect=hive", context);
+    InterpreterResult result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
-    sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" +
-            "partitioned by (dt string)", context);
+    // set table.sql-dialect without quote
+    result = sqlInterpreter.interpret(
+            "set table.sql-dialect=hive", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
 
-    // table.local-time-zone is only available from 1.12
-    if (flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.12.0"))) {
-      context = getInterpreterContext();
-      result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context);
-      assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
-    }
+    // show all settings
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("SET", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+    assertTrue(context.out.toString(), context.out.toString().contains("'table.sql-dialect' = 'hive"));
+    assertTrue(context.out.toString(), context.out.toString().contains("'table.local-time-zone' = 'UTC"));
   }
 
   @Test
-  public void testShowModules() throws InterpreterException, IOException {
-    FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+  public void testModules() throws InterpreterException, IOException {
     InterpreterContext context = getInterpreterContext();
-
-    // CREATE CATALOG
+    // show catalogs
     InterpreterResult result = sqlInterpreter.interpret(
             "show modules", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -548,7 +561,6 @@ public abstract class SqlInterpreterTest {
     assertTrue(resultMessages.toString(), resultMessages.get(0).getData().contains("core"));
   }
 
-
   protected InterpreterContext getInterpreterContext() {
     InterpreterContext context = InterpreterContext.builder()
             .setParagraphId("paragraphId")
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 4b3ebf9..9894764 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeoutException;
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 
-public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
+public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
 
   @Override
   protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
@@ -70,7 +70,8 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     context = getInterpreterContext();
-    String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\nz.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
+    String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\n" +
+            "z.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
     result = flinkInterpreter.interpret(code, context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
@@ -160,6 +161,10 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
 
   @Test
   public void testCancelStreamSql() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+    // clean checkpoint dir first, checkpoint dir is defined in init_stream.scala
+    File checkpointDir = new File("/tmp/flink/checkpoints");
+    FileUtils.deleteDirectory(checkpointDir);
+
     String initStreamScalaScript = getInitStreamScript(1000);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
@@ -181,13 +186,17 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     });
     thread.start();
 
-    // the streaming job will run for 20 seconds. check init_stream.scala
-    // sleep 10 seconds to make sure the job is started but not finished
-    Thread.sleep(10 * 1000);
+    // the streaming job will run for 60 seconds. check init_stream.scala
+    // sleep 30 seconds to make sure the job is started but not finished
+    Thread.sleep(30 * 1000);
 
     InterpreterContext context = getInterpreterContext();
     sqlInterpreter.cancel(context);
-    waiter.await(10 * 1000);
+    waiter.await(30 * 1000);
+
+    // verify checkpoints
+    assertTrue(checkpointDir.listFiles(f -> f.isDirectory()).length > 0);
+
     // resume job
     sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
@@ -198,117 +207,154 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
             resultMessages.get(0).getData().contains("url\tpv\n"));
   }
 
-  // TODO(zjffdu) flaky test
-  // @Test
-  public void testResumeStreamSqlFromSavePoint() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+  @Test
+  public void testResumeStreamSqlFromSavePointDir() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+    if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+      LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+      // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+      return;
+    }
+
     String initStreamScalaScript = getInitStreamScript(1000);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    File savePointDir = FileUtils.getTempDirectory();
+    File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
     final Waiter waiter = new Waiter();
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
                 "log group by url", context);
-        System.out.println("------------" + context.out.toString());
-        System.out.println("------------" + result2);
+        LOGGER.info("------------" + context.out.toString());
+        LOGGER.info("------------" + result2);
         waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+        // Flink job is succeed when it is cancelled with save point.
         waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
       } catch (Exception e) {
-        e.printStackTrace();
+        LOGGER.error("Should not throw exception", e);
         waiter.fail("Should not fail here");
       }
       waiter.resume();
     });
     thread.start();
 
-    // the streaming job will run for 20 seconds. check init_stream.scala
-    // sleep 10 seconds to make sure the job is started but not finished
-    Thread.sleep(10 * 1000);
+    // the streaming job will run for 60 seconds. check init_stream.scala
+    // sleep 30 seconds to make sure the job is started but not finished
+    Thread.sleep(30 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.cancel(context);
-    waiter.await(10 * 1000);
+    waiter.await(30 * 1000);
+
+    // verify save point is generated
+    String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
+    assertTrue(allSavepointPath.length > 0);
+
     // resume job from savepoint
+    context = getInterpreterContext();
+    context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
+    context.getLocalProperties().put("parallelism", "2");
+    context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertTrue(resultMessages.toString(),
-            resultMessages.get(0).getData().contains("url\tpv\n"));
+    assertEquals(resultMessages.toString(), "url\tpv\n" +
+                    "home\t10\n" +
+                    "product\t30\n" +
+                    "search\t20\n",
+            resultMessages.get(0).getData());
   }
 
-  // TODO(zjffdu) flaky test
-  //@Test
+  @Test
   public void testResumeStreamSqlFromExistSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
-    String initStreamScalaScript = getInitStreamScript(2000);
+    if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+      LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+      // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+      return;
+    }
+
+    String initStreamScalaScript = getInitStreamScript(1000);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    File savePointDir = FileUtils.getTempDirectory();
+    File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
     final Waiter waiter = new Waiter();
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+        context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
         InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
                 "log group by url", context);
+        LOGGER.info("------------" + context.out.toString());
+        LOGGER.info("------------" + result2);
         waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+        // Flink job is succeed when it is cancelled with save point.
         waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
       } catch (Exception e) {
-        e.printStackTrace();
+        LOGGER.error("Should not throw exception", e);
         waiter.fail("Should not fail here");
       }
       waiter.resume();
     });
     thread.start();
 
-    // the streaming job will run for 20 seconds. check init_stream.scala
-    // sleep 10 seconds to make sure the job is started but not finished
-    Thread.sleep(10 * 1000);
+    // the streaming job will run for 60 seconds. check init_stream.scala
+    // sleep 30 seconds to make sure the job is started but not finished
+    Thread.sleep(30 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+    context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.cancel(context);
-    waiter.await(10 * 1000);
+    waiter.await(30 * 1000);
 
     // get exist savepoint path from tempDirectory
     // if dir more than 1 then get first or throw error
     String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
-    assertTrue(allSavepointPath.length>0);
+    assertTrue(allSavepointPath.length > 0);
 
     String savepointPath = savePointDir.getAbsolutePath().concat(File.separator).concat(allSavepointPath[0]);
 
     // resume job from exist savepointPath
+    context = getInterpreterContext();
     context.getConfig().put(JobManager.SAVEPOINT_PATH,savepointPath);
+    context.getLocalProperties().put("parallelism", "2");
+    context.getLocalProperties().put("maxParallelism", "10");
     sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
-    assertTrue(resultMessages.toString(),
-            resultMessages.get(0).getData().contains("url\tpv\n"));
-
+    assertEquals(resultMessages.toString(), "url\tpv\n" +
+                    "home\t10\n" +
+                    "product\t30\n" +
+                    "search\t20\n",
+            resultMessages.get(0).getData());
   }
 
   @Test
   public void testResumeStreamSqlFromInvalidSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
-    String initStreamScalaScript = getInitStreamScript(2000);
+    if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+      LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+      // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+      return;
+    }
+
+    String initStreamScalaScript = getInitStreamScript(1000);
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
             getInterpreterContext());
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -343,10 +389,11 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
             "log group by url", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
-//    assertEquals(InterpreterResult.Type.TABLE,
-//            updatedOutput.toInterpreterResultMessage().getType());
-//    assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
-//            !updatedOutput.toInterpreterResultMessage().getData().isEmpty());
+    assertTrue(context.out.toString(), !context.out.toInterpreterResultMessage().isEmpty());
+    assertEquals(InterpreterResult.Type.TABLE,
+            context.out.toInterpreterResultMessage().get(0).getType());
+    assertTrue(context.out.toInterpreterResultMessage().get(0).getData(),
+            !context.out.toInterpreterResultMessage().get(0).getData().isEmpty());
   }
 
   @Test
@@ -389,6 +436,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
   @Test
   public void testMultipleInsertInto() throws InterpreterException, IOException {
     hiveShell.execute("create table source_table (id int, name string)");
+    hiveShell.execute("insert into source_table values(1, 'name')");
 
     File destDir = Files.createTempDirectory("flink_test").toFile();
     FileUtils.deleteDirectory(destDir);
@@ -426,16 +474,29 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     result = sqlInterpreter.interpret(
             "insert into dest_table select * from source_table;insert into dest_table2 select * from source_table",
             context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // check dest_table
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
 
+    // check dest_table2
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
 
     // runAsOne won't affect the select statement.
     context = getInterpreterContext();
     context.getLocalProperties().put("runAsOne", "true");
     result = sqlInterpreter.interpret(
-            "select 1",
+            "select 1 as a",
             context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("a\n1\n", context.out.toString());
   }
 
   @Test
diff --git a/flink/flink-scala-parent/src/test/resources/init_stream.scala b/flink/flink-scala-parent/src/test/resources/init_stream.scala
index f8d27ae..14993ea 100644
--- a/flink/flink-scala-parent/src/test/resources/init_stream.scala
+++ b/flink/flink-scala-parent/src/test/resources/init_stream.scala
@@ -4,9 +4,16 @@ import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
 import java.util.Collections
 import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+
 
 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-senv.enableCheckpointing(5000)
+senv.enableCheckpointing(10000)
+senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
+
+val chkConfig = senv.getCheckpointConfig
+chkConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
 
 val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
 
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 640eba6..2a3d82e 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -19,11 +19,8 @@ package org.apache.zeppelin.flink;
 
 
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,8 +28,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.net.InetAddress;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 
 /**
@@ -47,13 +42,15 @@ public abstract class FlinkShims {
 
   protected Properties properties;
   protected FlinkVersion flinkVersion;
+  protected FlinkSqlContext flinkSqlContext;
 
   public FlinkShims(FlinkVersion flinkVersion, Properties properties) {
     this.flinkVersion = flinkVersion;
     this.properties = properties;
   }
 
-  private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties)
+  private static FlinkShims loadShims(FlinkVersion flinkVersion,
+                                      Properties properties)
       throws Exception {
     Class<?> flinkShimsClass;
     if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
@@ -87,21 +84,14 @@ public abstract class FlinkShims {
     return flinkShims;
   }
 
-  protected static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) {
-    return new AttributedStringBuilder()
-            .style(AttributedStyle.DEFAULT.bold())
-            .append(cmd.toString())
-            .append("\t\t")
-            .style(AttributedStyle.DEFAULT)
-            .append(description)
-            .append('\n')
-            .toAttributedString();
-  }
-
   public FlinkVersion getFlinkVersion() {
     return flinkVersion;
   }
 
+  public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext);
+
+  public abstract void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext);
+
   public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
 
   public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
@@ -116,12 +106,6 @@ public abstract class FlinkShims {
 
   public abstract List collectToList(Object table) throws Exception;
 
-  public abstract void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception;
-
-  public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;
-
-  public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception;
-
   public abstract boolean rowEquals(Object row1, Object row2);
 
   public abstract Object fromDataSet(Object btenv, Object ds);
@@ -138,22 +122,12 @@ public abstract class FlinkShims {
 
   public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink);
 
-  public abstract Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt);
-
-  public abstract void executeSql(Object tableEnv, String sql);
-
-  public abstract String explain(Object tableEnv, String sql);
-
-  public abstract String sqlHelp();
-
   public abstract void setCatalogManagerSchemaResolver(Object catalogManager,
                                                        Object parser,
                                                        Object environmentSetting);
 
   public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
 
-  public abstract Map extractTableConfigOptions();
-
   public void setBatchRuntimeMode(Object tableConfig) {
     // only needed after flink 1.13
   }
@@ -169,4 +143,6 @@ public abstract class FlinkShims {
   public abstract ImmutablePair<Object, Object> createPlannerAndExecutor(
           ClassLoader classLoader, Object environmentSettings, Object sEnv,
           Object tableConfig, Object functionCatalog, Object catalogManager);
+
+  public abstract InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch);
 }
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java
new file mode 100644
index 0000000..769a51f
--- /dev/null
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.flink;
+
+import java.util.function.Consumer;
+
+
+/**
+ * Context class which is used to pass context from flink-scala-x module to flink-shims module.
+ */
+public class FlinkSqlContext {
+
+    private Object benv;
+    private Object senv;
+    private Object btenv;
+    private Object stenv;
+    private Object z;
+    private Consumer<String> streamSqlSelectConsumer;
+
+    public FlinkSqlContext(Object benv,
+                           Object senv,
+                           Object btenv,
+                           Object stenv,
+                           Object z,
+                           Consumer<String> streamSqlSelectConsumer) {
+        this.benv = benv;
+        this.senv = senv;
+        this.btenv = btenv;
+        this.stenv = stenv;
+        this.z = z;
+        this.streamSqlSelectConsumer = streamSqlSelectConsumer;
+    }
+
+    public Object getBenv() {
+        return benv;
+    }
+
+    public Object getSenv() {
+        return senv;
+    }
+
+    public Object getBtenv() {
+        return btenv;
+    }
+
+    public Object getStenv() {
+        return stenv;
+    }
+
+    public Object getZeppelinContext() {
+        return z;
+    }
+
+    public Consumer<String> getStreamSqlSelectConsumer() {
+        return streamSqlSelectConsumer;
+    }
+}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
deleted file mode 100644
index 1a4f0e3..0000000
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.sql;
-
-import org.apache.zeppelin.flink.FlinkShims;
-
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.regex.Pattern;
-
-/**
- * Simple parser for determining the type of command and its parameters.
- * All the SqlCommands should be put into this class, and the parsing logic needs to be put ito FlinkShims
- * because each version of flink has different sql syntax support.
- */
-public final class SqlCommandParser {
-
-  private FlinkShims flinkShims;
-  private Object tableEnv;
-
-  public SqlCommandParser(FlinkShims flinkShims, Object tableEnv) {
-    this.flinkShims = flinkShims;
-    this.tableEnv = tableEnv;
-  }
-
-  public Optional<SqlCommandCall> parse(String stmt) {
-    return flinkShims.parseSql(tableEnv, stmt);
-  }
-
-  // --------------------------------------------------------------------------------------------
-
-  private static final Function<String[], Optional<String[]>> NO_OPERANDS =
-          (operands) -> Optional.of(new String[0]);
-
-  private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
-          (operands) -> Optional.of(new String[]{operands[0]});
-
-  private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
-
-  /**
-   * Supported SQL commands.
-   */
-  public enum SqlCommand {
-    QUIT(
-            "(QUIT|EXIT)",
-            NO_OPERANDS),
-
-    CLEAR(
-            "CLEAR",
-            NO_OPERANDS),
-
-    HELP(
-            "HELP",
-            NO_OPERANDS),
-
-    SHOW_CATALOGS(
-            "SHOW\\s+CATALOGS",
-            NO_OPERANDS),
-
-    SHOW_DATABASES(
-            "SHOW\\s+DATABASES",
-            NO_OPERANDS),
-
-    SHOW_TABLES(
-            "SHOW\\s+TABLES",
-            NO_OPERANDS),
-
-    SHOW_FUNCTIONS(
-            "SHOW\\s+FUNCTIONS",
-            NO_OPERANDS),
-
-    SHOW_MODULES(
-            "SHOW\\s+MODULES",
-            NO_OPERANDS),
-
-    USE_CATALOG(
-            "USE\\s+CATALOG\\s+(.*)",
-            SINGLE_OPERAND),
-
-    USE(
-            "USE\\s+(?!CATALOG)(.*)",
-            SINGLE_OPERAND),
-
-    CREATE_CATALOG(null, SINGLE_OPERAND),
-
-    DROP_CATALOG(null, SINGLE_OPERAND),
-
-    DESC(
-            "DESC\\s+(.*)",
-            SINGLE_OPERAND),
-
-    DESCRIBE(
-            "DESCRIBE\\s+(.*)",
-            SINGLE_OPERAND),
-
-    EXPLAIN(
-            "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
-            (operands) -> {
-              return Optional.of(new String[] { operands[0], operands[1] });
-            }),
-
-    CREATE_DATABASE(
-            "(CREATE\\s+DATABASE\\s+.*)",
-            SINGLE_OPERAND),
-
-    DROP_DATABASE(
-            "(DROP\\s+DATABASE\\s+.*)",
-            SINGLE_OPERAND),
-
-    ALTER_DATABASE(
-            "(ALTER\\s+DATABASE\\s+.*)",
-            SINGLE_OPERAND),
-
-    CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND),
-
-    DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),
-
-    ALTER_TABLE(
-            "(ALTER\\s+TABLE\\s+.*)",
-            SINGLE_OPERAND),
-
-    DROP_VIEW(
-            "DROP\\s+VIEW\\s+(.*)",
-            SINGLE_OPERAND),
-
-    CREATE_VIEW(
-            "CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
-            (operands) -> {
-              if (operands.length < 2) {
-                return Optional.empty();
-              }
-              return Optional.of(new String[]{operands[0], operands[1]});
-            }),
-
-    CREATE_FUNCTION(null, SINGLE_OPERAND),
-
-    DROP_FUNCTION(null, SINGLE_OPERAND),
-
-    ALTER_FUNCTION(null, SINGLE_OPERAND),
-
-    SELECT(
-            "(SELECT.*)",
-            SINGLE_OPERAND),
-
-    INSERT_INTO(
-            "(INSERT\\s+INTO.*)",
-            SINGLE_OPERAND),
-
-    INSERT_OVERWRITE(
-            "(INSERT\\s+OVERWRITE.*)",
-            SINGLE_OPERAND),
-
-    SET(
-            "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
-            (operands) -> {
-              if (operands.length < 3) {
-                return Optional.empty();
-              } else if (operands[0] == null) {
-                return Optional.of(new String[0]);
-              }
-              return Optional.of(new String[]{operands[1], operands[2]});
-            }),
-
-    RESET(
-            "RESET",
-            NO_OPERANDS),
-
-    SOURCE(
-            "SOURCE\\s+(.*)",
-            SINGLE_OPERAND);
-
-    public final Pattern pattern;
-    public final Function<String[], Optional<String[]>> operandConverter;
-
-    SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
-      if (matchingRegex == null) {
-        this.pattern = null;
-      } else {
-        this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
-      }
-      this.operandConverter = operandConverter;
-    }
-
-    @Override
-    public String toString() {
-      return super.toString().replace('_', ' ');
-    }
-
-    public boolean hasOperands() {
-      return operandConverter != NO_OPERANDS;
-    }
-  }
-
-  /**
-   * Call of SQL command with operands and command type.
-   */
-  public static class SqlCommandCall {
-    public final SqlCommand command;
-    public final String[] operands;
-    public final String sql;
-
-    public SqlCommandCall(SqlCommand command, String[] operands, String sql) {
-      this.command = command;
-      this.operands = operands;
-      this.sql = sql;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      SqlCommandCall that = (SqlCommandCall) o;
-      return command == that.command && Arrays.equals(operands, that.operands);
-    }
-
-    @Override
-    public int hashCode() {
-      int result = Objects.hash(command);
-      result = 31 * result + Arrays.hashCode(operands);
-      return result;
-    }
-
-    @Override
-    public String toString() {
-      return command + "(" + Arrays.toString(operands) + ")";
-    }
-  }
-}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index a713d1c..e182f97 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -22,73 +22,32 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.delegation.*;
 import org.apache.flink.table.factories.ComponentFactoryService;
 import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
@@ -97,29 +56,19 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims112.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
 
 
 /**
@@ -128,41 +77,28 @@ import java.util.regex.Matcher;
 public class Flink112Shims extends FlinkShims {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
-  public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
-          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
-          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
-          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
-          .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
-          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
-          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
-          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
-          .toAttributedString();
-
-  private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+  private Flink112SqlInterpreter batchSqlInterpreter;
+  private Flink112SqlInterpreter streamSqlInterpreter;
+
 
   public Flink112Shims(FlinkVersion flinkVersion, Properties properties) {
     super(flinkVersion, properties);
   }
+
+  public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.batchSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, true);
+  }
+
+  public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, false);
+  }
+
   @Override
   public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
     // do nothing
   }
 
-
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
@@ -222,31 +158,6 @@ public class Flink112Shims extends FlinkShims {
   }
 
   @Override
-  public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
-    StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
-    statementSetMap.put(context.getParagraphId(), statementSet);
-  }
-
-  @Override
-  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
-    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
-  }
-
-  @Override
-  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
-    JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while (!jobClient.getJobStatus().get().isTerminalState()) {
-      LOGGER.debug("Wait for job to finish");
-      Thread.sleep(1000 * 5);
-    }
-    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
-      context.out.write("Job is cancelled.\n");
-      return false;
-    }
-    return true;
-  }
-
-  @Override
   public boolean rowEquals(Object row1, Object row2) {
     Row r1 = (Row) row1;
     Row r2 = (Row) row2;
@@ -292,151 +203,6 @@ public class Flink112Shims extends FlinkShims {
   }
 
   /**
-   * Parse it via flink SqlParser first, then fallback to regular expression matching.
-   *
-   * @param tableEnv
-   * @param stmt
-   * @return
-   */
-  @Override
-  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
-    Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
-    SqlCommandCall sqlCommandCall = null;
-    try {
-      // parse statement via regex matching first
-      Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
-      if (callOpt.isPresent()) {
-        sqlCommandCall = callOpt.get();
-      } else {
-        sqlCommandCall = parseBySqlParser(sqlParser, stmt);
-      }
-    } catch (Exception e) {
-      return Optional.empty();
-    }
-    return Optional.of(sqlCommandCall);
-
-  }
-
-  private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
-    List<Operation> operations;
-    try {
-      operations = sqlParser.parse(stmt);
-    } catch (Throwable e) {
-      throw new Exception("Invalidate SQL statement.", e);
-    }
-    if (operations.size() != 1) {
-      throw new Exception("Only single statement is supported now.");
-    }
-
-    final SqlCommand cmd;
-    String[] operands = new String[]{stmt};
-    Operation operation = operations.get(0);
-    if (operation instanceof CatalogSinkModifyOperation) {
-      boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
-      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
-    } else if (operation instanceof CreateTableOperation) {
-      cmd = SqlCommand.CREATE_TABLE;
-    } else if (operation instanceof DropTableOperation) {
-      cmd = SqlCommand.DROP_TABLE;
-    } else if (operation instanceof AlterTableOperation) {
-      cmd = SqlCommand.ALTER_TABLE;
-    } else if (operation instanceof CreateViewOperation) {
-      cmd = SqlCommand.CREATE_VIEW;
-    } else if (operation instanceof DropViewOperation) {
-      cmd = SqlCommand.DROP_VIEW;
-    } else if (operation instanceof CreateDatabaseOperation) {
-      cmd = SqlCommand.CREATE_DATABASE;
-    } else if (operation instanceof DropDatabaseOperation) {
-      cmd = SqlCommand.DROP_DATABASE;
-    } else if (operation instanceof AlterDatabaseOperation) {
-      cmd = SqlCommand.ALTER_DATABASE;
-    } else if (operation instanceof CreateCatalogOperation) {
-      cmd = SqlCommand.CREATE_CATALOG;
-    } else if (operation instanceof DropCatalogOperation) {
-      cmd = SqlCommand.DROP_CATALOG;
-    } else if (operation instanceof UseCatalogOperation) {
-      cmd = SqlCommand.USE_CATALOG;
-      operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
-    } else if (operation instanceof UseDatabaseOperation) {
-      cmd = SqlCommand.USE;
-      operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
-    } else if (operation instanceof ShowCatalogsOperation) {
-      cmd = SqlCommand.SHOW_CATALOGS;
-      operands = new String[0];
-    } else if (operation instanceof ShowDatabasesOperation) {
-      cmd = SqlCommand.SHOW_DATABASES;
-      operands = new String[0];
-    } else if (operation instanceof ShowTablesOperation) {
-      cmd = SqlCommand.SHOW_TABLES;
-      operands = new String[0];
-    } else if (operation instanceof ShowFunctionsOperation) {
-      cmd = SqlCommand.SHOW_FUNCTIONS;
-      operands = new String[0];
-    } else if (operation instanceof CreateCatalogFunctionOperation ||
-            operation instanceof CreateTempSystemFunctionOperation) {
-      cmd = SqlCommand.CREATE_FUNCTION;
-    } else if (operation instanceof DropCatalogFunctionOperation ||
-            operation instanceof DropTempSystemFunctionOperation) {
-      cmd = SqlCommand.DROP_FUNCTION;
-    } else if (operation instanceof AlterCatalogFunctionOperation) {
-      cmd = SqlCommand.ALTER_FUNCTION;
-    } else if (operation instanceof ExplainOperation) {
-      cmd = SqlCommand.EXPLAIN;
-    } else if (operation instanceof DescribeTableOperation) {
-      cmd = SqlCommand.DESCRIBE;
-      operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
-    } else if (operation instanceof QueryOperation) {
-      cmd = SqlCommand.SELECT;
-    } else {
-      throw new Exception("Unknown operation: " + operation.asSummaryString());
-    }
-
-    return new SqlCommandCall(cmd, operands, stmt);
-  }
-
-  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
-    // parse statement via regex matching
-    for (SqlCommand cmd : SqlCommand.values()) {
-      if (cmd.pattern != null) {
-        final Matcher matcher = cmd.pattern.matcher(stmt);
-        if (matcher.matches()) {
-          final String[] groups = new String[matcher.groupCount()];
-          for (int i = 0; i < groups.length; i++) {
-            groups[i] = matcher.group(i + 1);
-          }
-          return cmd.operandConverter.apply(groups)
-                  .map((operands) -> {
-                    String[] newOperands = operands;
-                    if (cmd == SqlCommand.EXPLAIN) {
-                      // convert `explain xx` to `explain plan for xx`
-                      // which can execute through executeSql method
-                      newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
-                    }
-                    return new SqlCommandCall(cmd, newOperands, stmt);
-                  });
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  @Override
-  public void executeSql(Object tableEnv, String sql) {
-    ((TableEnvironment) tableEnv).executeSql(sql);
-  }
-
-  @Override
-  public String explain(Object tableEnv, String sql) {
-    TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
-    return tableResult.collect().next().getField(0).toString();
-  }
-
-  @Override
-  public String sqlHelp() {
-    return MESSAGE_HELP.toString();
-  }
-
-  /**
    * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
    * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
    * @param catalogManager
@@ -464,36 +230,6 @@ public class Flink112Shims extends FlinkShims {
   }
 
   @Override
-  public Map extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    try {
-      configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    } catch (NoClassDefFoundError e) {
-      LOGGER.warn("No pyflink jars found");
-    }
-    configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
-  }
-
-  @Override
   public String[] rowToString(Object row, Object table, Object tableConfig) {
     return PrintUtils.rowToString((Row) row);
   }
@@ -511,10 +247,10 @@ public class Flink112Shims extends FlinkShims {
       Method createMethod = executorFactory.getClass()
               .getMethod("create", Map.class, StreamExecutionEnvironment.class);
 
-      return (Executor) createMethod.invoke(
+      return createMethod.invoke(
               executorFactory,
               executorProperties,
-              (StreamExecutionEnvironment) sEnv);
+              sEnv);
     } catch (Exception e) {
       throw new TableException(
               "Could not instantiate the executor. Make sure a planner module is on the classpath",
@@ -535,4 +271,12 @@ public class Flink112Shims extends FlinkShims {
                     (CatalogManager) catalogManager);
     return ImmutablePair.of(planner, executor);
   }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+    if (isBatch) {
+      return batchSqlInterpreter.runSqlList(st, context);
+    } else {
+      return streamSqlInterpreter.runSqlList(st, context);
+    }
+  }
 }
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
new file mode 100644
index 0000000..b72ef26
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.zeppelin.flink.shims112.SqlCommandParser;
+import org.apache.zeppelin.flink.shims112.SqlCommandParser.SqlCommand;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class Flink112SqlInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink112SqlInterpreter.class);
+  private static final AttributedString MESSAGE_HELP =
+          new AttributedStringBuilder()
+                  .append("The following commands are available:\n\n")
+                  .append(
+                          formatCommand(
+                                  SqlCommand.CREATE_TABLE,
+                                  "Create table under current catalog and database."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.DROP_TABLE,
+                                  "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.CREATE_VIEW,
+                                  "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.DESCRIBE,
+                                  "Describes the schema of a table with the given name."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.DROP_VIEW,
+                                  "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.EXPLAIN,
+                                  "Describes the execution plan of a query or table with the given name."))
+                  .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.INSERT_INTO,
+                                  "Inserts the results of a SQL SELECT query into a declared table sink."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.INSERT_OVERWRITE,
+                                  "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.SELECT,
+                                  "Executes a SQL SELECT query on the Flink cluster."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.SET,
+                                  "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.SHOW_FUNCTIONS,
+                                  "Shows all user-defined and built-in functions."))
+                  .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.USE_CATALOG,
+                                  "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
+                  .append(
+                          formatCommand(
+                                  SqlCommand.USE,
+                                  "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+                  .style(AttributedStyle.DEFAULT.underline())
+                  .append("\nHint")
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
+                  .toAttributedString();
+
+  private static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) {
+    return new AttributedStringBuilder()
+            .style(AttributedStyle.DEFAULT.bold())
+            .append(cmd.toString())
+            .append("\t\t")
+            .style(AttributedStyle.DEFAULT)
+            .append(description)
+            .append('\n')
+            .toAttributedString();
+  }
+
+  private FlinkSqlContext flinkSqlContext;
+  private TableEnvironment tbenv;
+  private ZeppelinContext z;
+  private SqlCommandParser sqlCommandParser;
+  private SqlSplitter sqlSplitter;
+  private boolean isBatch;
+  private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+  // paragraphId -> StatementSet
+  private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+
+  public Flink112SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+    this.flinkSqlContext = flinkSqlContext;
+    this.isBatch = isBatch;
+    if (isBatch) {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+    } else {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+    }
+    this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+    this.sqlCommandParser = new SqlCommandParser((TableEnvironmentInternal) tbenv);
+    this.sqlSplitter = new SqlSplitter();
+    JobListener jobListener = new JobListener() {
+      @Override
+      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+        if (lock.isHeldByCurrentThread()) {
+          lock.unlock();
+          LOGGER.info("UnLock JobSubmitLock");
+        }
+      }
+
+      @Override
+      public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+      }
+    };
+
+    ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+    ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+  }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    try {
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+      boolean isFirstInsert = true;
+      boolean hasInsert = false;
+      for (String sql : sqls) {
+        SqlCommandParser.SqlCommandCall sqlCommand = null;
+        try {
+          sqlCommand = sqlCommandParser.parse(sql);
+        } catch (Exception e1) {
+          try {
+            context.out.write("%text Invalid Sql statement: " + sql + "\n");
+            context.out.write(e1.toString());
+            context.out.write(MESSAGE_HELP.toString());
+          } catch (IOException e2) {
+            return new InterpreterResult(InterpreterResult.Code.ERROR, e2.toString());
+          }
+          return new InterpreterResult(InterpreterResult.Code.ERROR);
+        }
+
+        try {
+          if (sqlCommand.command == SqlCommand.INSERT_INTO ||
+                  sqlCommand.command == SqlCommand.INSERT_OVERWRITE) {
+            hasInsert = true;
+            if (isFirstInsert && runAsOne) {
+              startMultipleInsert(context);
+              isFirstInsert = false;
+            }
+          }
+          callCommand(sqlCommand, sql, context);
+          context.out.flush();
+        } catch (Throwable e) {
+          LOGGER.error("Fail to run sql:" + sql, e);
+          try {
+            context.out.write("%text Fail to run sql command: " +
+                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+          } catch (IOException ex) {
+            LOGGER.warn("Unexpected exception:", ex);
+            return new InterpreterResult(InterpreterResult.Code.ERROR,
+                    ExceptionUtils.getStackTrace(e));
+          }
+          return new InterpreterResult(InterpreterResult.Code.ERROR);
+        }
+      }
+
+      if (runAsOne && hasInsert) {
+        try {
+          lock.lock();
+          String jobName = context.getStringLocalProperty("jobName", st);
+          if (executeMultipleInsertInto(jobName, context)) {
+            context.out.write("Insertion successfully.\n");
+          }
+        } catch (Exception e) {
+          LOGGER.error("Fail to execute sql as one job", e);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+        } finally {
+          if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to execute sql", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      statementSetMap.remove(context.getParagraphId());
+    }
+
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+  }
+
+  private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
+                           String sql,
+                           InterpreterContext context) throws Exception {
+    switch (cmdCall.command) {
+      case SET:
+        callSet(cmdCall, context);
+        break;
+      case HELP:
+        callHelp(context);
+        break;
+      case SHOW_CATALOGS:
+        callShowCatalogs(context);
+        break;
+      case SHOW_CURRENT_CATALOG:
+        callShowCurrentCatalog(context);
+        break;
+      case SHOW_DATABASES:
+        callShowDatabases(context);
+        break;
+      case SHOW_CURRENT_DATABASE:
+        callShowCurrentDatabase(context);
+        break;
+      case SHOW_TABLES:
+        callShowTables(context);
+        break;
+      case SHOW_FUNCTIONS:
+        callShowFunctions(context);
+        break;
+      case SHOW_MODULES:
+        callShowModules(context);
+        break;
+      case SHOW_PARTITIONS:
+        callShowPartitions(sql, context);
+        break;
+      case USE_CATALOG:
+        callUseCatalog(cmdCall.operands[0], context);
+        break;
+      case USE:
+        callUseDatabase(cmdCall.operands[0], context);
+        break;
+      case DESC:
+      case DESCRIBE:
+        callDescribe(cmdCall.operands[0], context);
+        break;
+      case EXPLAIN:
+        callExplain(cmdCall.operands[0], context);
+        break;
+      case SELECT:
+        callSelect(cmdCall.operands[0], context);
+        break;
+      case INSERT_INTO:
+      case INSERT_OVERWRITE:
+        callInsertInto(cmdCall.operands[0], context);
+        break;
+      case CREATE_TABLE:
+        callDDL(sql, context, "Table has been created.");
+        break;
+      case DROP_TABLE:
+        callDDL(sql, context, "Table has been dropped.");
+        break;
+      case ALTER_TABLE:
+        callDDL(sql, context, "Alter table succeeded!");
+        break;
+      case CREATE_VIEW:
+        callDDL(sql, context, "View has been created.");
+        break;
+      case DROP_VIEW:
+        callDDL(sql, context, "View has been dropped.");
+        break;
+      case ALTER_VIEW:
+        callDDL(sql, context, "Alter view succeeded!");
+        break;
+      case CREATE_FUNCTION:
+        callDDL(sql, context, "Function has been created.");
+        break;
+      case DROP_FUNCTION:
+        callDDL(sql, context, "Function has been removed.");
+        break;
+      case ALTER_FUNCTION:
+        callDDL(sql, context, "Alter function succeeded!");
+        break;
+      case CREATE_DATABASE:
+        callDDL(sql, context, "Database has been created.");
+        break;
+      case DROP_DATABASE:
+        callDDL(sql, context, "Database has been removed.");
+        break;
+      case ALTER_DATABASE:
+        callDDL(sql, context, "Alter database succeeded!");
+        break;
+      case CREATE_CATALOG:
+        callDDL(sql, context, "Catalog has been created.");
+        break;
+      case DROP_CATALOG:
+        callDDL(sql, context, "Catalog has been dropped.");
+        break;
+      default:
+        throw new Exception("Unsupported command: " + cmdCall.command);
+    }
+  }
+
+  private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+    try {
+      lock.lock();
+      this.tbenv.executeSql(sql);
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+    context.out.write(message + "\n");
+  }
+
+  private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+    tbenv.executeSql("USE CATALOG `" + catalog + "`");
+  }
+
+  private void callHelp(InterpreterContext context) throws IOException {
+    context.out.write(MESSAGE_HELP.toString() + "\n");
+  }
+
+  private void callShowCatalogs(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+    List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+  }
+
+  private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+    String catalog = tableResult.collect().next().toString();
+    context.out.write("%text current catalog: " + catalog + "\n");
+  }
+
+  private void callShowDatabases(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+    List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+  }
+
+  private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+    String database = tableResult.collect().next().toString();
+    context.out.write("%text current database: " + database + "\n");
+  }
+
+  private void callShowTables(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+    List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+  }
+
+  private void callShowFunctions(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+    List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  private void callShowModules(InterpreterContext context) throws IOException {
+    String[] modules = this.tbenv.listModules();
+    context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+  }
+
+  private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql(sql);
+    List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table partitions\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  public void startMultipleInsert(InterpreterContext context) throws Exception {
+    StatementSet statementSet = tbenv.createStatementSet();
+    statementSetMap.put(context.getParagraphId(), statementSet);
+  }
+
+  public void addInsertStatement(String sql, InterpreterContext context) throws Exception {
+    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
+  }
+
+  public boolean executeMultipleInsertInto(String jobName, InterpreterContext context) throws Exception {
+    JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
+    while (!jobClient.getJobStatus().get().isTerminalState()) {
+      LOGGER.debug("Wait for job to finish");
+      Thread.sleep(1000 * 5);
+    }
+    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
+      context.out.write("Job is cancelled.\n");
+      return false;
+    }
+    return true;
+  }
+
+  private void callUseDatabase(String databaseName,
+                               InterpreterContext context) throws IOException {
+    this.tbenv.executeSql("USE `" + databaseName + "`");
+  }
+
+  private void callDescribe(String name, InterpreterContext context) throws IOException {
+    TableResult tableResult = null;
+    try {
+      tableResult = tbenv.executeSql("DESCRIBE " + name);
+    } catch (Exception e) {
+      throw new IOException("Fail to describe table: " + name, e);
+    }
+    CloseableIterator<Row> result = tableResult.collect();
+    StringBuilder builder = new StringBuilder();
+    builder.append("Column\tType\n");
+    while (result.hasNext()) {
+      Row row = result.next();
+      builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+    }
+    context.out.write("%table\n" + builder.toString());
+  }
+
+  private void callExplain(String sql, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = tbenv.executeSql(sql);
+      String result = tableResult.collect().next().getField(0).toString();
+      context.out.write(result + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callSelect(String sql, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      if (isBatch) {
+        callBatchInnerSelect(sql, context);
+      } else {
+        callStreamInnerSelect(sql, context);
+      }
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    String result = z.showData(table);
+    context.out.write(result);
+  }
+
+  public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+    flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+  }
+
+  private String removeSingleQuote(String value) {
+    value = value.trim();
+    if (value.startsWith("'")) {
+      value = value.substring(1);
+    }
+    if (value.endsWith("'")) {
+      value = value.substring(0, value.length() - 1);
+    }
+    return value;
+  }
+
+  public void callSet(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws Exception {
+    if (sqlCommand.operands.length == 0) {
+      // show all properties
+      final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+      List<String> prettyEntries = new ArrayList<>();
+      for (String k : properties.keySet()) {
+        prettyEntries.add(
+                String.format(
+                        "'%s' = '%s'",
+                        EncodingUtils.escapeSingleQuotes(k),
+                        EncodingUtils.escapeSingleQuotes(properties.get(k))));
+      }
+      prettyEntries.sort(String::compareTo);
+      prettyEntries.forEach(entry -> {
+        try {
+          context.out.write(entry + "\n");
+        } catch (IOException e) {
+          LOGGER.warn("Fail to write output", e);
+        }
+      });
+    } else {
+      String key = removeSingleQuote(sqlCommand.operands[0]);
+      String value = removeSingleQuote(sqlCommand.operands[1]);
+      if ("execution.runtime-mode".equals(key)) {
+        throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, " +
+                "you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode");
+      }
+      LOGGER.info("Set table config: {}={}", key, value);
+      this.tbenv.getConfig().getConfiguration().setString(key, value);
+    }
+  }
+
+  public void callInsertInto(String sql,
+                             InterpreterContext context) throws IOException {
+    if (!isBatch) {
+      context.getLocalProperties().put("flink.streaming.insert_into", "true");
+    }
+    try {
+      lock.lock();
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+      if (!runAsOne) {
+        this.tbenv.sqlUpdate(sql);
+        String jobName = context.getStringLocalProperty("jobName", sql);
+        this.tbenv.execute(jobName);
+        context.out.write("Insertion successfully.\n");
+      } else {
+        addInsertStatement(sql, context);
+      }
+    } catch (Exception e) {
+      throw new IOException(e);
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
new file mode 100644
index 0000000..309250f
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims112;
+
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.ddl.*;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This is copied from flink project with minor modification.
+ * Simple parser for determining the type of command and its parameters.
+ * */
+public final class SqlCommandParser {
+
+  private Parser parser;
+
+  public SqlCommandParser(TableEnvironmentInternal tbenv) {
+    this.parser = tbenv.getParser();
+  }
+
+  /**
+   * Parse a sql statement and return corresponding {@link SqlCommandCall}. If the statement is
+   * invalid, a {@link Exception} will be thrown.
+   *
+   * @param stmt The statement to be parsed
+   * @return the corresponding SqlCommandCall.
+   */
+  public SqlCommandCall parse(String stmt) throws Exception {
+    // normalize
+    stmt = stmt.trim();
+    // remove ';' at the end
+    if (stmt.endsWith(";")) {
+      stmt = stmt.substring(0, stmt.length() - 1).trim();
+    }
+
+    // parse statement via regex matching first
+    Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
+    if (callOpt.isPresent()) {
+      return callOpt.get();
+    } else {
+      return parseBySqlParser(stmt);
+    }
+  }
+
+  private SqlCommandCall parseBySqlParser(String stmt) throws Exception {
+    List<Operation> operations;
+    try {
+      operations = parser.parse(stmt);
+    } catch (Throwable e) {
+      throw new Exception("Invalidate SQL statement.", e);
+    }
+    if (operations.size() != 1) {
+      throw new Exception("Only single statement is supported now.");
+    }
+
+    final SqlCommand cmd;
+    String[] operands = new String[] {stmt};
+    Operation operation = operations.get(0);
+    if (operation instanceof CatalogSinkModifyOperation) {
+      boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
+      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
+    } else if (operation instanceof CreateTableOperation) {
+      cmd = SqlCommand.CREATE_TABLE;
+    } else if (operation instanceof DropTableOperation) {
+      cmd = SqlCommand.DROP_TABLE;
+    } else if (operation instanceof AlterTableOperation) {
+      cmd = SqlCommand.ALTER_TABLE;
+    } else if (operation instanceof CreateViewOperation) {
+      cmd = SqlCommand.CREATE_VIEW;
+    } else if (operation instanceof DropViewOperation) {
+      cmd = SqlCommand.DROP_VIEW;
+    } else if (operation instanceof AlterViewOperation) {
+      cmd = SqlCommand.ALTER_VIEW;
+    } else if (operation instanceof CreateDatabaseOperation) {
+      cmd = SqlCommand.CREATE_DATABASE;
+    } else if (operation instanceof DropDatabaseOperation) {
+      cmd = SqlCommand.DROP_DATABASE;
+    } else if (operation instanceof AlterDatabaseOperation) {
+      cmd = SqlCommand.ALTER_DATABASE;
+    } else if (operation instanceof CreateCatalogOperation) {
+      cmd = SqlCommand.CREATE_CATALOG;
+    } else if (operation instanceof DropCatalogOperation) {
+      cmd = SqlCommand.DROP_CATALOG;
+    } else if (operation instanceof UseCatalogOperation) {
+      cmd = SqlCommand.USE_CATALOG;
+      operands = new String[] {((UseCatalogOperation) operation).getCatalogName()};
+    } else if (operation instanceof UseDatabaseOperation) {
+      cmd = SqlCommand.USE;
+      operands = new String[] {((UseDatabaseOperation) operation).getDatabaseName()};
+    } else if (operation instanceof ShowCatalogsOperation) {
+      cmd = SqlCommand.SHOW_CATALOGS;
+      operands = new String[0];
+    } else if (operation instanceof ShowCurrentCatalogOperation) {
+      cmd = SqlCommand.SHOW_CURRENT_CATALOG;
+      operands = new String[0];
+    } else if (operation instanceof ShowDatabasesOperation) {
+      cmd = SqlCommand.SHOW_DATABASES;
+      operands = new String[0];
+    } else if (operation instanceof ShowCurrentDatabaseOperation) {
+      cmd = SqlCommand.SHOW_CURRENT_DATABASE;
+      operands = new String[0];
+    } else if (operation instanceof ShowTablesOperation) {
+      cmd = SqlCommand.SHOW_TABLES;
+      operands = new String[0];
+    } else if (operation instanceof ShowFunctionsOperation) {
+      cmd = SqlCommand.SHOW_FUNCTIONS;
+      operands = new String[0];
+    } else if (operation instanceof ShowPartitionsOperation) {
+      cmd = SqlCommand.SHOW_PARTITIONS;
+    } else if (operation instanceof CreateCatalogFunctionOperation
+            || operation instanceof CreateTempSystemFunctionOperation) {
+      cmd = SqlCommand.CREATE_FUNCTION;
+    } else if (operation instanceof DropCatalogFunctionOperation
+            || operation instanceof DropTempSystemFunctionOperation) {
+      cmd = SqlCommand.DROP_FUNCTION;
+    } else if (operation instanceof AlterCatalogFunctionOperation) {
+      cmd = SqlCommand.ALTER_FUNCTION;
+    } else if (operation instanceof ExplainOperation) {
+      cmd = SqlCommand.EXPLAIN;
+    } else if (operation instanceof DescribeTableOperation) {
+      cmd = SqlCommand.DESCRIBE;
+      operands =
+              new String[] {
+                      ((DescribeTableOperation) operation)
+                              .getSqlIdentifier()
+                              .asSerializableString()
+              };
+    } else if (operation instanceof QueryOperation) {
+      cmd = SqlCommand.SELECT;
+    } else {
+      throw new Exception("Unknown operation: " + operation.asSummaryString());
+    }
+
+    return new SqlCommandCall(cmd, operands);
+  }
+
+  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
+    // parse statement via regex matching
+    for (SqlCommand cmd : SqlCommand.values()) {
+      if (cmd.hasRegexPattern()) {
+        final Matcher matcher = cmd.pattern.matcher(stmt);
+        if (matcher.matches()) {
+          final String[] groups = new String[matcher.groupCount()];
+          for (int i = 0; i < groups.length; i++) {
+            groups[i] = matcher.group(i + 1);
+          }
+          return cmd.operandConverter
+                  .apply(groups)
+                  .map(
+                          (operands) -> {
+                            String[] newOperands = operands;
+                            if (cmd == SqlCommand.EXPLAIN) {
+                              // convert `explain xx` to `explain plan for xx`
+                              // which can execute through executeSql method
+                              newOperands =
+                                      new String[] {
+                                              "EXPLAIN PLAN FOR "
+                                                      + operands[0]
+                                                      + " "
+                                                      + operands[1]
+                                      };
+                            }
+                            return new SqlCommandCall(cmd, newOperands);
+                          });
+        }
+      }
+    }
+    return Optional.empty();
+  }
+
+  // --------------------------------------------------------------------------------------------
+
+  private static final Function<String[], Optional<String[]>> NO_OPERANDS =
+          (operands) -> Optional.of(new String[0]);
+
+  private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
+          (operands) -> Optional.of(new String[] {operands[0]});
+
+  private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+
+  /** Supported SQL commands. */
+  public enum SqlCommand {
+
+    HELP("HELP", NO_OPERANDS),
+
+    SHOW_CATALOGS,
+
+    SHOW_CURRENT_CATALOG,
+
+    SHOW_DATABASES,
+
+    SHOW_CURRENT_DATABASE,
+
+    SHOW_TABLES,
+
+    SHOW_FUNCTIONS,
+
+    // FLINK-17396
+    SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),
+
+    SHOW_PARTITIONS,
+
+    USE_CATALOG,
+
+    USE,
+
+    CREATE_CATALOG,
+
+    DROP_CATALOG,
+
+    DESC("DESC\\s+(.*)", SINGLE_OPERAND),
+
+    DESCRIBE,
+
+    // supports both `explain xx` and `explain plan for xx` now
+    // TODO should keep `explain xx` ?
+    // only match "EXPLAIN SELECT xx" and "EXPLAIN INSERT xx" here
+    // "EXPLAIN PLAN FOR xx" should be parsed via sql parser
+    EXPLAIN(
+            "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
+            (operands) -> {
+              return Optional.of(new String[] {operands[0], operands[1]});
+            }),
+
+    CREATE_DATABASE,
+
+    DROP_DATABASE,
+
+    ALTER_DATABASE,
+
+    CREATE_TABLE,
+
+    DROP_TABLE,
+
+    ALTER_TABLE,
+
+    CREATE_VIEW,
+
+    DROP_VIEW,
+
+    ALTER_VIEW,
+
+    CREATE_FUNCTION,
+
+    DROP_FUNCTION,
+
+    ALTER_FUNCTION,
+
+    SELECT,
+
+    INSERT_INTO,
+
+    INSERT_OVERWRITE,
+
+    SET(
+            "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
+            (operands) -> {
+              if (operands.length < 3) {
+                return Optional.empty();
+              } else if (operands[0] == null) {
+                return Optional.of(new String[0]);
+              }
+              return Optional.of(new String[] {operands[1], operands[2]});
+            }),
+
+    SOURCE("SOURCE\\s+(.*)", SINGLE_OPERAND);
+
+    public final @Nullable Pattern pattern;
+    public final @Nullable Function<String[], Optional<String[]>> operandConverter;
+
+    SqlCommand() {
+      this.pattern = null;
+      this.operandConverter = null;
+    }
+
+    SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
+      this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+      this.operandConverter = operandConverter;
+    }
+
+    @Override
+    public String toString() {
+      return super.toString().replace('_', ' ');
+    }
+
+    public boolean hasOperands() {
+      return operandConverter != NO_OPERANDS;
+    }
+
+    public boolean hasRegexPattern() {
+      return pattern != null;
+    }
+  }
+
+  /** Call of SQL command with operands and command type. */
+  public static class SqlCommandCall {
+    public final SqlCommand command;
+    public final String[] operands;
+
+    public SqlCommandCall(SqlCommand command, String[] operands) {
+      this.command = command;
+      this.operands = operands;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      SqlCommandCall that = (SqlCommandCall) o;
+      return command == that.command && Arrays.equals(operands, that.operands);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(command);
+      result = 31 * result + Arrays.hashCode(operands);
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return command + "(" + Arrays.toString(operands) + ")";
+    }
+  }
+}
\ No newline at end of file
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 481bfee..a4743e8 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -22,7 +22,6 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,36 +29,21 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.scala.DataSet;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.PlannerType;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.*;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.delegation.PlannerFactory;
 import org.apache.flink.table.factories.ComponentFactoryService;
@@ -67,32 +51,6 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
@@ -101,30 +59,20 @@ import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
 import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.time.ZoneId;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
 
 
 /**
@@ -133,42 +81,27 @@ import java.util.regex.Matcher;
 public class Flink113Shims extends FlinkShims {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
-  public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
-          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
-          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
-          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
-          .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
-          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
-          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
-          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
-          .toAttributedString();
-
-  private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+  private Flink113SqlInterpreter batchSqlInterpreter;
+  private Flink113SqlInterpreter streamSqlInterpreter;
 
   public Flink113Shims(FlinkVersion flinkVersion, Properties properties) {
     super(flinkVersion, properties);
   }
 
+  public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.batchSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, true);
+  }
+
+  public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false);
+  }
+
   @Override
   public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
     // do nothing
   }
 
-
   @Override
   public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
     return new StreamExecutionEnvironmentFactory() {
@@ -228,31 +161,6 @@ public class Flink113Shims extends FlinkShims {
   }
 
   @Override
-  public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
-    StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
-    statementSetMap.put(context.getParagraphId(), statementSet);
-  }
-
-  @Override
-  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
-    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
-  }
-
-  @Override
-  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
-    JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while (!jobClient.getJobStatus().get().isTerminalState()) {
-      LOGGER.debug("Wait for job to finish");
-      Thread.sleep(1000 * 5);
-    }
-    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
-      context.out.write("Job is cancelled.\n");
-      return false;
-    }
-    return true;
-  }
-
-  @Override
   public boolean rowEquals(Object row1, Object row2) {
     Row r1 = (Row) row1;
     Row r2 = (Row) row2;
@@ -298,151 +206,6 @@ public class Flink113Shims extends FlinkShims {
   }
 
   /**
-   * Parse it via flink SqlParser first, then fallback to regular expression matching.
-   *
-   * @param tableEnv
-   * @param stmt
-   * @return
-   */
-  @Override
-  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
-    Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
-    SqlCommandCall sqlCommandCall = null;
-    try {
-      // parse statement via regex matching first
-      Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
-      if (callOpt.isPresent()) {
-        sqlCommandCall = callOpt.get();
-      } else {
-        sqlCommandCall = parseBySqlParser(sqlParser, stmt);
-      }
-    } catch (Exception e) {
-      return Optional.empty();
-    }
-    return Optional.of(sqlCommandCall);
-
-  }
-
-  private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
-    List<Operation> operations;
-    try {
-      operations = sqlParser.parse(stmt);
-    } catch (Throwable e) {
-      throw new Exception("Invalidate SQL statement.", e);
-    }
-    if (operations.size() != 1) {
-      throw new Exception("Only single statement is supported now.");
-    }
-
-    final SqlCommand cmd;
-    String[] operands = new String[]{stmt};
-    Operation operation = operations.get(0);
-    if (operation instanceof CatalogSinkModifyOperation) {
-      boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
-      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
-    } else if (operation instanceof CreateTableOperation) {
-      cmd = SqlCommand.CREATE_TABLE;
-    } else if (operation instanceof DropTableOperation) {
-      cmd = SqlCommand.DROP_TABLE;
-    } else if (operation instanceof AlterTableOperation) {
-      cmd = SqlCommand.ALTER_TABLE;
-    } else if (operation instanceof CreateViewOperation) {
-      cmd = SqlCommand.CREATE_VIEW;
-    } else if (operation instanceof DropViewOperation) {
-      cmd = SqlCommand.DROP_VIEW;
-    } else if (operation instanceof CreateDatabaseOperation) {
-      cmd = SqlCommand.CREATE_DATABASE;
-    } else if (operation instanceof DropDatabaseOperation) {
-      cmd = SqlCommand.DROP_DATABASE;
-    } else if (operation instanceof AlterDatabaseOperation) {
-      cmd = SqlCommand.ALTER_DATABASE;
-    } else if (operation instanceof CreateCatalogOperation) {
-      cmd = SqlCommand.CREATE_CATALOG;
-    } else if (operation instanceof DropCatalogOperation) {
-      cmd = SqlCommand.DROP_CATALOG;
-    } else if (operation instanceof UseCatalogOperation) {
-      cmd = SqlCommand.USE_CATALOG;
-      operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
-    } else if (operation instanceof UseDatabaseOperation) {
-      cmd = SqlCommand.USE;
-      operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
-    } else if (operation instanceof ShowCatalogsOperation) {
-      cmd = SqlCommand.SHOW_CATALOGS;
-      operands = new String[0];
-    } else if (operation instanceof ShowDatabasesOperation) {
-      cmd = SqlCommand.SHOW_DATABASES;
-      operands = new String[0];
-    } else if (operation instanceof ShowTablesOperation) {
-      cmd = SqlCommand.SHOW_TABLES;
-      operands = new String[0];
-    } else if (operation instanceof ShowFunctionsOperation) {
-      cmd = SqlCommand.SHOW_FUNCTIONS;
-      operands = new String[0];
-    } else if (operation instanceof CreateCatalogFunctionOperation ||
-            operation instanceof CreateTempSystemFunctionOperation) {
-      cmd = SqlCommand.CREATE_FUNCTION;
-    } else if (operation instanceof DropCatalogFunctionOperation ||
-            operation instanceof DropTempSystemFunctionOperation) {
-      cmd = SqlCommand.DROP_FUNCTION;
-    } else if (operation instanceof AlterCatalogFunctionOperation) {
-      cmd = SqlCommand.ALTER_FUNCTION;
-    } else if (operation instanceof ExplainOperation) {
-      cmd = SqlCommand.EXPLAIN;
-    } else if (operation instanceof DescribeTableOperation) {
-      cmd = SqlCommand.DESCRIBE;
-      operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
-    } else if (operation instanceof QueryOperation) {
-      cmd = SqlCommand.SELECT;
-    } else {
-      throw new Exception("Unknown operation: " + operation.asSummaryString());
-    }
-
-    return new SqlCommandCall(cmd, operands, stmt);
-  }
-
-  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
-    // parse statement via regex matching
-    for (SqlCommand cmd : SqlCommand.values()) {
-      if (cmd.pattern != null) {
-        final Matcher matcher = cmd.pattern.matcher(stmt);
-        if (matcher.matches()) {
-          final String[] groups = new String[matcher.groupCount()];
-          for (int i = 0; i < groups.length; i++) {
-            groups[i] = matcher.group(i + 1);
-          }
-          return cmd.operandConverter.apply(groups)
-                  .map((operands) -> {
-                    String[] newOperands = operands;
-                    if (cmd == SqlCommand.EXPLAIN) {
-                      // convert `explain xx` to `explain plan for xx`
-                      // which can execute through executeSql method
-                      newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
-                    }
-                    return new SqlCommandCall(cmd, newOperands, stmt);
-                  });
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  @Override
-  public void executeSql(Object tableEnv, String sql) {
-    ((TableEnvironment) tableEnv).executeSql(sql);
-  }
-
-  @Override
-  public String explain(Object tableEnv, String sql) {
-    TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
-    return tableResult.collect().next().getField(0).toString();
-  }
-
-  @Override
-  public String sqlHelp() {
-    return MESSAGE_HELP.toString();
-  }
-
-  /**
    * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
    * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
    * @param catalogManager
@@ -468,36 +231,6 @@ public class Flink113Shims extends FlinkShims {
   }
 
   @Override
-  public Map extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    try {
-      configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    } catch (NoClassDefFoundError e) {
-      LOGGER.warn("No pyflink jars found");
-    }
-    configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
-  }
-
-  @Override
   public void setBatchRuntimeMode(Object tableConfig) {
     ((TableConfig) tableConfig).getConfiguration()
             .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
@@ -558,4 +291,12 @@ public class Flink113Shims extends FlinkShims {
                     (CatalogManager) catalogManager);
     return ImmutablePair.of(planner, executor);
   }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+    if (isBatch) {
+      return batchSqlInterpreter.runSqlList(st, context);
+    } else {
+      return streamSqlInterpreter.runSqlList(st, context);
+    }
+  }
 }
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
new file mode 100644
index 0000000..e3fe741
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class Flink113SqlInterpreter {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(Flink113SqlInterpreter.class);
+
+  private static final AttributedString MESSAGE_HELP =
+          new AttributedStringBuilder()
+                  .append("The following commands are available:\n\n")
+                  .append(
+                          formatCommand(
+                                  "CREATE TABLE",
+                                  "Create table under current catalog and database."))
+                  .append(
+                          formatCommand(
+                                  "DROP TABLE",
+                                  "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
+                  .append(
+                          formatCommand(
+                                  "CREATE VIEW",
+                                  "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+                  .append(
+                          formatCommand(
+                                  "DESCRIBE",
+                                  "Describes the schema of a table with the given name."))
+                  .append(
+                          formatCommand(
+                                  "DROP VIEW",
+                                  "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
+                  .append(
+                          formatCommand(
+                                  "EXPLAIN",
+                                  "Describes the execution plan of a query or table with the given name."))
+                  .append(formatCommand("HELP", "Prints the available commands."))
+                  .append(
+                          formatCommand(
+                                  "INSERT INTO",
+                                  "Inserts the results of a SQL SELECT query into a declared table sink."))
+                  .append(
+                          formatCommand(
+                                  "INSERT OVERWRITE",
+                                  "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
+                  .append(
+                          formatCommand(
+                                  "SELECT", "Executes a SQL SELECT query on the Flink cluster."))
+                  .append(
+                          formatCommand(
+                                  "SET",
+                                  "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+                  .append(
+                          formatCommand(
+                                  "SHOW FUNCTIONS",
+                                  "Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'"))
+                  .append(formatCommand("SHOW TABLES", "Shows all registered tables."))
+                  .append(
+                          formatCommand(
+                                  "USE CATALOG",
+                                  "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
+                  .append(
+                          formatCommand(
+                                  "USE",
+                                  "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+                  .append(
+                          formatCommand(
+                                  "LOAD MODULE",
+                                  "Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = "
+                                          + "'<value1>' [, '<key2>' = '<value2>', ...])];'"))
+                  .append(
+                          formatCommand(
+                                  "UNLOAD MODULE",
+                                  "Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
+                  .append(
+                          formatCommand(
+                                  "USE MODULES",
+                                  "Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'"))
+                  .append(
+                          formatCommand(
+                                  "BEGIN STATEMENT SET",
+                                  "Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'"))
+                  .append(formatCommand("END", "Ends a statement set. Syntax: 'END;'"))
+                  .style(AttributedStyle.DEFAULT.underline())
+                  .append("\nHint")
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
+                  .toAttributedString();
+
+  private static AttributedString formatCommand(String cmd, String description) {
+    return new AttributedStringBuilder()
+            .style(AttributedStyle.DEFAULT.bold())
+            .append(cmd)
+            .append("\t\t")
+            .style(AttributedStyle.DEFAULT)
+            .append(description)
+            .append('\n')
+            .toAttributedString();
+  }
+
+  private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+  private FlinkSqlContext flinkSqlContext;
+  private TableEnvironment tbenv;
+  private ZeppelinContext z;
+  private Parser sqlParser;
+  private SqlSplitter sqlSplitter;
+  // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
+  private Map<String, Boolean> statementModeMap = new HashMap<>();
+  private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+  private boolean isBatch;
+  private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+  public Flink113SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+    this.flinkSqlContext = flinkSqlContext;
+    this.isBatch = isBatch;
+    if (isBatch) {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+    } else {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+    }
+    this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+    this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+    this.sqlSplitter = new SqlSplitter();
+    JobListener jobListener = new JobListener() {
+      @Override
+      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+        if (lock.isHeldByCurrentThread()) {
+          lock.unlock();
+          LOGGER.info("UnLock JobSubmitLock");
+        }
+      }
+
+      @Override
+      public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+      }
+    };
+
+    ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+    ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+  }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    try {
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+      statementModeMap.put(context.getParagraphId(), runAsOne);
+      String jobName = context.getLocalProperties().get("jobName");
+      if (StringUtils.isNotBlank(jobName)) {
+        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+      }
+
+      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+      for (String sql : sqls) {
+        List<Operation> operations = null;
+        try {
+          operations = sqlParser.parse(sql);
+        } catch (SqlParserException e) {
+          context.out.write("%text Invalid Sql statement: " + sql + "\n");
+          context.out.write(MESSAGE_HELP.toString());
+          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+        }
+
+        try {
+          callOperation(sql, operations.get(0), context);
+          context.out.flush();
+        } catch (Throwable e) {
+          LOGGER.error("Fail to run sql:" + sql, e);
+          try {
+            context.out.write("%text Fail to run sql command: " +
+                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+          } catch (IOException ex) {
+            LOGGER.warn("Unexpected exception:", ex);
+            return new InterpreterResult(InterpreterResult.Code.ERROR,
+                    ExceptionUtils.getStackTrace(e));
+          }
+          return new InterpreterResult(InterpreterResult.Code.ERROR);
+        }
+      }
+
+      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+        try {
+          lock.lock();
+          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+          if (!modifyOperations.isEmpty()) {
+            callInserts(modifyOperations, context);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Fail to execute sql as one job", e);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+        } finally {
+          if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to execute sql", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      statementOperationsMap.remove(context.getParagraphId());
+      statementModeMap.remove(context.getParagraphId());
+    }
+
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+  }
+
+  private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+    if (operation instanceof HelpOperation) {
+      callHelp(context);
+    } else if (operation instanceof SetOperation) {
+      callSet((SetOperation) operation, context);
+    } else if (operation instanceof CatalogSinkModifyOperation) {
+      // INSERT INTO/OVERWRITE
+      callInsert((CatalogSinkModifyOperation) operation, context);
+    } else if (operation instanceof QueryOperation) {
+      // SELECT
+      callSelect(sql, (QueryOperation) operation, context);
+    } else if (operation instanceof ExplainOperation) {
+      callExplain((ExplainOperation) operation, context);
+    } else if (operation instanceof BeginStatementSetOperation) {
+      callBeginStatementSet(context);
+    } else if (operation instanceof EndStatementSetOperation) {
+      callEndStatementSet(context);
+    } else if (operation instanceof ShowCatalogsOperation) {
+      callShowCatalogs(context);
+    } else if (operation instanceof ShowCurrentCatalogOperation) {
+      callShowCurrentCatalog(context);
+    } else if (operation instanceof UseCatalogOperation) {
+      callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+    } else if (operation instanceof CreateCatalogOperation) {
+      callDDL(sql, context, "Catalog has been created.");
+    } else if (operation instanceof DropCatalogOperation) {
+      callDDL(sql, context, "Catalog has been dropped.");
+    } else if (operation instanceof UseDatabaseOperation) {
+      UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+      callUseDatabase(useDBOperation.getDatabaseName(), context);
+    } else if (operation instanceof CreateDatabaseOperation) {
+      callDDL(sql, context, "Database has been created.");
+    } else if (operation instanceof DropDatabaseOperation) {
+      callDDL(sql, context, "Database has been removed.");
+    } else if (operation instanceof AlterDatabaseOperation) {
+      callDDL(sql, context, "Alter database succeeded!");
+    } else if (operation instanceof ShowDatabasesOperation) {
+      callShowDatabases(context);
+    } else if (operation instanceof ShowCurrentDatabaseOperation) {
+      callShowCurrentDatabase(context);
+    } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+      callDDL(sql, context, "Table has been created.");
+    } else if (operation instanceof AlterTableOperation) {
+      callDDL(sql, context, "Alter table succeeded!");
+    } else if (operation instanceof DropTableOperation) {
+      callDDL(sql, context, "Table has been dropped.");
+    } else if (operation instanceof DescribeTableOperation) {
+      DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+      callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+    } else if (operation instanceof ShowTablesOperation) {
+      callShowTables(context);
+    } else if (operation instanceof CreateViewOperation) {
+      callDDL(sql, context, "View has been created.");
+    } else if (operation instanceof DropViewOperation) {
+      callDDL(sql, context, "View has been dropped.");
+    } else if (operation instanceof AlterViewOperation) {
+      callDDL(sql, context, "Alter view succeeded!");
+    } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been created.");
+    } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been removed.");
+    } else if (operation instanceof AlterCatalogFunctionOperation) {
+      callDDL(sql, context, "Alter function succeeded!");
+    } else if (operation instanceof ShowFunctionsOperation) {
+      callShowFunctions(context);
+    } else if (operation instanceof ShowModulesOperation) {
+      callShowModules(context);
+    } else if (operation instanceof ShowPartitionsOperation) {
+      ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+      callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+    } else {
+      throw new IOException(operation.getClass().getName() + " is not supported");
+    }
+  }
+
+  private void callHelp(InterpreterContext context) throws IOException {
+    context.out.write(MESSAGE_HELP.toString());
+  }
+
+  private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
+    if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
+      List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+      modifyOperations.add(operation);
+      statementOperationsMap.put(context.getParagraphId(), modifyOperations);
+    } else {
+      callInserts(Collections.singletonList(operation), context);
+    }
+  }
+
+  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+    if (!isBatch) {
+      context.getLocalProperties().put("flink.streaming.insert_into", "true");
+    }
+    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+    checkState(tableResult.getJobClient().isPresent());
+    try {
+      tableResult.await();
+      JobClient jobClient = tableResult.getJobClient().get();
+      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+        context.out.write("Insertion successfully.\n");
+      } else {
+        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Flink job is interrupted", e);
+    } catch (ExecutionException e) {
+      throw new IOException("Flink job is failed", e);
+    }
+  }
+
+  private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+      String explanation =
+              Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+      context.out.write(explanation + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      if (isBatch) {
+        callBatchInnerSelect(sql, context);
+      } else {
+        callStreamInnerSelect(sql, context);
+      }
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    String result = z.showData(table);
+    context.out.write(result);
+  }
+
+  public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+    flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+  }
+
+  public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+    if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+      // set a property
+      String key = setOperation.getKey().get().trim();
+      String value = setOperation.getValue().get().trim();
+      this.tbenv.getConfig().getConfiguration().setString(key, value);
+      LOGGER.info("Set table config: {}={}", key, value);
+    } else {
+      // show all properties
+      final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+      List<String> prettyEntries = new ArrayList<>();
+      for (String key : properties.keySet()) {
+        prettyEntries.add(
+                String.format(
+                        "'%s' = '%s'",
+                        EncodingUtils.escapeSingleQuotes(key),
+                        EncodingUtils.escapeSingleQuotes(properties.get(key))));
+      }
+      prettyEntries.sort(String::compareTo);
+      prettyEntries.forEach(entry -> {
+        try {
+          context.out.write(entry + "\n");
+        } catch (IOException e) {
+          LOGGER.warn("Fail to write output", e);
+        }
+      });
+    }
+  }
+
+  private void callBeginStatementSet(InterpreterContext context) throws IOException {
+    statementModeMap.put(context.getParagraphId(), true);
+  }
+
+  private void callEndStatementSet(InterpreterContext context) throws IOException {
+    List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+    if (modifyOperations != null && !modifyOperations.isEmpty()) {
+      callInserts(modifyOperations, context);
+    } else {
+      context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+    }
+    statementModeMap.remove(context.getParagraphId());
+  }
+
+  private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+    tbenv.executeSql("USE CATALOG `" + catalog + "`");
+  }
+  
+  private void callUseDatabase(String databaseName,
+                               InterpreterContext context) throws IOException {
+    this.tbenv.executeSql("USE `" + databaseName + "`");
+  }
+
+  private void callShowCatalogs(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+    List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+  }
+
+  private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+    String catalog = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current catalog: " + catalog + "\n");
+  }
+
+  private void callShowDatabases(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+    List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+  }
+
+  private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+    String database = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current database: " + database + "\n");
+  }
+
+  private void callShowTables(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+    List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+  }
+
+  private void callShowFunctions(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+    List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  private void callShowModules(InterpreterContext context) throws IOException {
+    String[] modules = this.tbenv.listModules();
+    context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+  }
+
+  private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql(sql);
+    List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+  }
+
+  private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+    try {
+      lock.lock();
+      this.tbenv.executeSql(sql);
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+    context.out.write(message + "\n");
+  }
+
+  private void callDescribe(String name, InterpreterContext context) throws IOException {
+    TableResult tableResult = null;
+    try {
+      tableResult = tbenv.executeSql("DESCRIBE " + name);
+    } catch (Exception e) {
+      throw new IOException("Fail to describe table: " + name, e);
+    }
+    CloseableIterator<Row> result = tableResult.collect();
+    StringBuilder builder = new StringBuilder();
+    builder.append("Column\tType\n");
+    while (result.hasNext()) {
+      Row row = result.next();
+      builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+    }
+    context.out.write("%table\n" + builder);
+  }
+}
diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml
index c160548..aa6e1e4 100644
--- a/flink/flink1.14-shims/pom.xml
+++ b/flink/flink1.14-shims/pom.xml
@@ -116,6 +116,13 @@
 
         <dependency>
             <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
             <version>${flink.version}</version>
             <scope>provided</scope>
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 56fdc7e..e23ac54 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -22,41 +22,26 @@ import org.apache.commons.cli.CommandLine;
 import org.apache.commons.compress.utils.Lists;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.PlannerType;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.*;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
 import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
 import org.apache.flink.table.catalog.CatalogManager;
 import org.apache.flink.table.catalog.FunctionCatalog;
 import org.apache.flink.table.catalog.GenericInMemoryCatalog;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.delegation.Executor;
 import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.delegation.Planner;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.factories.PlannerFactoryUtil;
@@ -64,32 +49,6 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.table.utils.PrintUtils;
@@ -97,30 +56,19 @@ import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.FlinkException;
 import org.apache.zeppelin.flink.shims114.CollectStreamTableSink;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
 import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.time.ZoneId;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
 
 
 /**
@@ -129,36 +77,22 @@ import java.util.regex.Matcher;
 public class Flink114Shims extends FlinkShims {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class);
-  public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
-          .append("The following commands are available:\n\n")
-          .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
-          .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
-          .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
-          .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
-          .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
-          .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
-          .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
-          .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
-          .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
-          .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
-          .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
-          .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
-          .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
-          .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
-          .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
-          .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
-          .style(AttributedStyle.DEFAULT.underline())
-          .append("\nHint")
-          .style(AttributedStyle.DEFAULT)
-          .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
-          .toAttributedString();
-
-  private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+  private Flink114SqlInterpreter batchSqlInterpreter;
+  private Flink114SqlInterpreter streamSqlInterpreter;
 
   public Flink114Shims(FlinkVersion flinkVersion, Properties properties) {
     super(flinkVersion, properties);
   }
 
+  public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.batchSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, true);
+  }
+
+  public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+    this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false);
+  }
+
   @Override
   public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
     // do nothing
@@ -224,31 +158,6 @@ public class Flink114Shims extends FlinkShims {
   }
 
   @Override
-  public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
-    StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
-    statementSetMap.put(context.getParagraphId(), statementSet);
-  }
-
-  @Override
-  public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
-    statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
-  }
-
-  @Override
-  public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
-    JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
-    while (!jobClient.getJobStatus().get().isTerminalState()) {
-      LOGGER.debug("Wait for job to finish");
-      Thread.sleep(1000 * 5);
-    }
-    if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
-      context.out.write("Job is cancelled.\n");
-      return false;
-    }
-    return true;
-  }
-
-  @Override
   public boolean rowEquals(Object row1, Object row2) {
     Row r1 = (Row) row1;
     Row r2 = (Row) row2;
@@ -296,151 +205,6 @@ public class Flink114Shims extends FlinkShims {
   }
 
   /**
-   * Parse it via flink SqlParser first, then fallback to regular expression matching.
-   *
-   * @param tableEnv
-   * @param stmt
-   * @return
-   */
-  @Override
-  public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
-    Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
-    SqlCommandCall sqlCommandCall = null;
-    try {
-      // parse statement via regex matching first
-      Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
-      if (callOpt.isPresent()) {
-        sqlCommandCall = callOpt.get();
-      } else {
-        sqlCommandCall = parseBySqlParser(sqlParser, stmt);
-      }
-    } catch (Exception e) {
-      return Optional.empty();
-    }
-    return Optional.of(sqlCommandCall);
-
-  }
-
-  private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
-    List<Operation> operations;
-    try {
-      operations = sqlParser.parse(stmt);
-    } catch (Throwable e) {
-      throw new Exception("Invalidate SQL statement.", e);
-    }
-    if (operations.size() != 1) {
-      throw new Exception("Only single statement is supported now.");
-    }
-
-    final SqlCommand cmd;
-    String[] operands = new String[]{stmt};
-    Operation operation = operations.get(0);
-    if (operation instanceof CatalogSinkModifyOperation) {
-      boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
-      cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
-    } else if (operation instanceof CreateTableOperation) {
-      cmd = SqlCommand.CREATE_TABLE;
-    } else if (operation instanceof DropTableOperation) {
-      cmd = SqlCommand.DROP_TABLE;
-    } else if (operation instanceof AlterTableOperation) {
-      cmd = SqlCommand.ALTER_TABLE;
-    } else if (operation instanceof CreateViewOperation) {
-      cmd = SqlCommand.CREATE_VIEW;
-    } else if (operation instanceof DropViewOperation) {
-      cmd = SqlCommand.DROP_VIEW;
-    } else if (operation instanceof CreateDatabaseOperation) {
-      cmd = SqlCommand.CREATE_DATABASE;
-    } else if (operation instanceof DropDatabaseOperation) {
-      cmd = SqlCommand.DROP_DATABASE;
-    } else if (operation instanceof AlterDatabaseOperation) {
-      cmd = SqlCommand.ALTER_DATABASE;
-    } else if (operation instanceof CreateCatalogOperation) {
-      cmd = SqlCommand.CREATE_CATALOG;
-    } else if (operation instanceof DropCatalogOperation) {
-      cmd = SqlCommand.DROP_CATALOG;
-    } else if (operation instanceof UseCatalogOperation) {
-      cmd = SqlCommand.USE_CATALOG;
-      operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
-    } else if (operation instanceof UseDatabaseOperation) {
-      cmd = SqlCommand.USE;
-      operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
-    } else if (operation instanceof ShowCatalogsOperation) {
-      cmd = SqlCommand.SHOW_CATALOGS;
-      operands = new String[0];
-    } else if (operation instanceof ShowDatabasesOperation) {
-      cmd = SqlCommand.SHOW_DATABASES;
-      operands = new String[0];
-    } else if (operation instanceof ShowTablesOperation) {
-      cmd = SqlCommand.SHOW_TABLES;
-      operands = new String[0];
-    } else if (operation instanceof ShowFunctionsOperation) {
-      cmd = SqlCommand.SHOW_FUNCTIONS;
-      operands = new String[0];
-    } else if (operation instanceof CreateCatalogFunctionOperation ||
-            operation instanceof CreateTempSystemFunctionOperation) {
-      cmd = SqlCommand.CREATE_FUNCTION;
-    } else if (operation instanceof DropCatalogFunctionOperation ||
-            operation instanceof DropTempSystemFunctionOperation) {
-      cmd = SqlCommand.DROP_FUNCTION;
-    } else if (operation instanceof AlterCatalogFunctionOperation) {
-      cmd = SqlCommand.ALTER_FUNCTION;
-    } else if (operation instanceof ExplainOperation) {
-      cmd = SqlCommand.EXPLAIN;
-    } else if (operation instanceof DescribeTableOperation) {
-      cmd = SqlCommand.DESCRIBE;
-      operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
-    } else if (operation instanceof QueryOperation) {
-      cmd = SqlCommand.SELECT;
-    } else {
-      throw new Exception("Unknown operation: " + operation.asSummaryString());
-    }
-
-    return new SqlCommandCall(cmd, operands, stmt);
-  }
-
-  private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
-    // parse statement via regex matching
-    for (SqlCommand cmd : SqlCommand.values()) {
-      if (cmd.pattern != null) {
-        final Matcher matcher = cmd.pattern.matcher(stmt);
-        if (matcher.matches()) {
-          final String[] groups = new String[matcher.groupCount()];
-          for (int i = 0; i < groups.length; i++) {
-            groups[i] = matcher.group(i + 1);
-          }
-          return cmd.operandConverter.apply(groups)
-                  .map((operands) -> {
-                    String[] newOperands = operands;
-                    if (cmd == SqlCommand.EXPLAIN) {
-                      // convert `explain xx` to `explain plan for xx`
-                      // which can execute through executeSql method
-                      newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
-                    }
-                    return new SqlCommandCall(cmd, newOperands, stmt);
-                  });
-        }
-      }
-    }
-    return Optional.empty();
-  }
-
-  @Override
-  public void executeSql(Object tableEnv, String sql) {
-    ((TableEnvironment) tableEnv).executeSql(sql);
-  }
-
-  @Override
-  public String explain(Object tableEnv, String sql) {
-    TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
-    return tableResult.collect().next().getField(0).toString();
-  }
-
-  @Override
-  public String sqlHelp() {
-    return MESSAGE_HELP.toString();
-  }
-
-  /**
    * Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
    * This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
    * @param catalogManager
@@ -466,36 +230,6 @@ public class Flink114Shims extends FlinkShims {
   }
 
   @Override
-  public Map extractTableConfigOptions() {
-    Map<String, ConfigOption> configOptions = new HashMap<>();
-    configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
-    configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
-    try {
-      configOptions.putAll(extractConfigOptions(PythonOptions.class));
-    } catch (NoClassDefFoundError e) {
-      LOGGER.warn("No pyflink jars found");
-    }
-    configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
-    return configOptions;
-  }
-
-  private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
-    Map<String, ConfigOption> configOptions = new HashMap();
-    Field[] fields = clazz.getDeclaredFields();
-    for (Field field : fields) {
-      if (field.getType().isAssignableFrom(ConfigOption.class)) {
-        try {
-          ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
-          configOptions.put(configOption.key(), configOption);
-        } catch (Throwable e) {
-          LOGGER.warn("Fail to get ConfigOption", e);
-        }
-      }
-    }
-    return configOptions;
-  }
-
-  @Override
   public void setBatchRuntimeMode(Object tableConfig) {
     ((TableConfig) tableConfig).getConfiguration()
             .set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
@@ -556,4 +290,12 @@ public class Flink114Shims extends FlinkShims {
             (FunctionCatalog) functionCatalog);
     return ImmutablePair.of(planner, executor);
   }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+    if (isBatch) {
+      return batchSqlInterpreter.runSqlList(st, context);
+    } else {
+      return streamSqlInterpreter.runSqlList(st, context);
+    }
+  }
 }
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
new file mode 100644
index 0000000..f155fcc
--- /dev/null
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class Flink114SqlInterpreter {
+
+  protected static final Logger LOGGER = LoggerFactory.getLogger(Flink114SqlInterpreter.class);
+
+  private static final String CMD_DESC_DELIMITER = "\t\t";
+
+  /**
+   * SQL Client HELP command helper class.
+   */
+  private static final class SQLCliCommandsDescriptions {
+    private int commandMaxLength;
+    private final Map<String, String> commandsDescriptions;
+
+    public SQLCliCommandsDescriptions() {
+      this.commandsDescriptions = new LinkedHashMap<>();
+      this.commandMaxLength = -1;
+    }
+
+    public SQLCliCommandsDescriptions commandDescription(String command, String description) {
+      Preconditions.checkState(
+              StringUtils.isNotBlank(command), "content of command must not be empty.");
+      Preconditions.checkState(
+              StringUtils.isNotBlank(description),
+              "content of command's description must not be empty.");
+      this.updateMaxCommandLength(command.length());
+      this.commandsDescriptions.put(command, description);
+      return this;
+    }
+
+    private void updateMaxCommandLength(int newLength) {
+      Preconditions.checkState(newLength > 0);
+      if (this.commandMaxLength < newLength) {
+        this.commandMaxLength = newLength;
+      }
+    }
+
+    public AttributedString build() {
+      AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder();
+      if (!this.commandsDescriptions.isEmpty()) {
+        this.commandsDescriptions.forEach(
+                (cmd, cmdDesc) -> {
+                  attributedStringBuilder
+                          .style(AttributedStyle.DEFAULT.bold())
+                          .append(
+                                  String.format(
+                                          String.format("%%-%ds", commandMaxLength), cmd))
+                          .append(CMD_DESC_DELIMITER)
+                          .style(AttributedStyle.DEFAULT)
+                          .append(cmdDesc)
+                          .append('\n');
+                });
+      }
+      return attributedStringBuilder.toAttributedString();
+    }
+  }
+
+  private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS =
+          new SQLCliCommandsDescriptions()
+                  .commandDescription("HELP", "Prints the available commands.")
+                  .commandDescription(
+                          "SET",
+                          "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+                  .commandDescription(
+                          "RESET",
+                          "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
+                  .commandDescription(
+                          "INSERT INTO",
+                          "Inserts the results of a SQL SELECT query into a declared table sink.")
+                  .commandDescription(
+                          "INSERT OVERWRITE",
+                          "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
+                  .commandDescription(
+                          "SELECT", "Executes a SQL SELECT query on the Flink cluster.")
+                  .commandDescription(
+                          "EXPLAIN",
+                          "Describes the execution plan of a query or table with the given name.")
+                  .commandDescription(
+                          "BEGIN STATEMENT SET",
+                          "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+                  .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+                  // (TODO) zjffdu, ADD/REMOVE/SHOW JAR
+                  .build();
+
+  // --------------------------------------------------------------------------------------------
+
+  public static final AttributedString MESSAGE_HELP =
+          new AttributedStringBuilder()
+                  .append("The following commands are available:\n\n")
+                  .append(SQL_CLI_COMMANDS_DESCRIPTIONS)
+                  .style(AttributedStyle.DEFAULT.underline())
+                  .append("\nHint")
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
+                  // About Documentation Link.
+                  .style(AttributedStyle.DEFAULT)
+                  .append(
+                          "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
+                  .toAttributedString();
+
+  private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+  private FlinkSqlContext flinkSqlContext;
+  private TableEnvironment tbenv;
+  private ZeppelinContext z;
+  private Parser sqlParser;
+  private SqlSplitter sqlSplitter;
+  // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
+  private Map<String, Boolean> statementModeMap = new HashMap<>();
+  private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+  private boolean isBatch;
+  private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+  public Flink114SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+    this.flinkSqlContext = flinkSqlContext;
+    this.isBatch = isBatch;
+    if (isBatch) {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+    } else {
+      this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+    }
+    this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+    this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+    this.sqlSplitter = new SqlSplitter();
+    JobListener jobListener = new JobListener() {
+      @Override
+      public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+        if (lock.isHeldByCurrentThread()) {
+          lock.unlock();
+          LOGGER.info("UnLock JobSubmitLock");
+        }
+      }
+
+      @Override
+      public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+      }
+    };
+
+    ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+    ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+  }
+
+  public InterpreterResult runSqlList(String st, InterpreterContext context) {
+    try {
+      boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+      statementModeMap.put(context.getParagraphId(), runAsOne);
+      String jobName = context.getLocalProperties().get("jobName");
+      if (StringUtils.isNotBlank(jobName)) {
+        tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+      }
+
+      List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+      for (String sql : sqls) {
+        List<Operation> operations = null;
+        try {
+          operations = sqlParser.parse(sql);
+        } catch (SqlParserException e) {
+          context.out.write("%text Invalid Sql statement: " + sql + "\n");
+          context.out.write(MESSAGE_HELP.toString());
+          return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+        }
+
+        try {
+          callOperation(sql, operations.get(0), context);
+          context.out.flush();
+        } catch (Throwable e) {
+          LOGGER.error("Fail to run sql:" + sql, e);
+          try {
+            context.out.write("%text Fail to run sql command: " +
+                    sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+          } catch (IOException ex) {
+            LOGGER.warn("Unexpected exception:", ex);
+            return new InterpreterResult(InterpreterResult.Code.ERROR,
+                    ExceptionUtils.getStackTrace(e));
+          }
+          return new InterpreterResult(InterpreterResult.Code.ERROR);
+        }
+      }
+
+      if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+        try {
+          lock.lock();
+          List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+          if (!modifyOperations.isEmpty()) {
+            callInserts(modifyOperations, context);
+          }
+        } catch (Exception e) {
+          LOGGER.error("Fail to execute sql as one job", e);
+          return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+        } finally {
+          if (lock.isHeldByCurrentThread()) {
+            lock.unlock();
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOGGER.error("Fail to execute sql", e);
+      return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+    } finally {
+      statementOperationsMap.remove(context.getParagraphId());
+      statementModeMap.remove(context.getParagraphId());
+    }
+
+    return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+  }
+
+  private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+    if (operation instanceof HelpOperation) {
+      // HELP
+      callHelp(context);
+    } else if (operation instanceof SetOperation) {
+      // SET
+      callSet((SetOperation) operation, context);
+    } else if (operation instanceof CatalogSinkModifyOperation) {
+      // INSERT INTO/OVERWRITE
+      callInsert((CatalogSinkModifyOperation) operation, context);
+    } else if (operation instanceof QueryOperation) {
+      // SELECT
+      callSelect(sql, (QueryOperation) operation, context);
+    } else if (operation instanceof ExplainOperation) {
+      // EXPLAIN
+      callExplain((ExplainOperation) operation, context);
+    } else if (operation instanceof BeginStatementSetOperation) {
+      // BEGIN STATEMENT SET
+      callBeginStatementSet(context);
+    } else if (operation instanceof EndStatementSetOperation) {
+      // END
+      callEndStatementSet(context);
+    } else if (operation instanceof ShowCreateTableOperation) {
+      // SHOW CREATE TABLE
+      callShowCreateTable((ShowCreateTableOperation) operation, context);
+    } else if (operation instanceof ShowCatalogsOperation) {
+      callShowCatalogs(context);
+    } else if (operation instanceof ShowCurrentCatalogOperation) {
+      callShowCurrentCatalog(context);
+    } else if (operation instanceof UseCatalogOperation) {
+      callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+    } else if (operation instanceof CreateCatalogOperation) {
+      callDDL(sql, context, "Catalog has been created.");
+    } else if (operation instanceof DropCatalogOperation) {
+      callDDL(sql, context, "Catalog has been dropped.");
+    } else if (operation instanceof UseDatabaseOperation) {
+      UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+      callUseDatabase(useDBOperation.getDatabaseName(), context);
+    } else if (operation instanceof CreateDatabaseOperation) {
+      callDDL(sql, context, "Database has been created.");
+    } else if (operation instanceof DropDatabaseOperation) {
+      callDDL(sql, context, "Database has been removed.");
+    } else if (operation instanceof AlterDatabaseOperation) {
+      callDDL(sql, context, "Alter database succeeded!");
+    } else if (operation instanceof ShowDatabasesOperation) {
+      callShowDatabases(context);
+    } else if (operation instanceof ShowCurrentDatabaseOperation) {
+      callShowCurrentDatabase(context);
+    } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+      callDDL(sql, context, "Table has been created.");
+    } else if (operation instanceof AlterTableOperation) {
+      callDDL(sql, context, "Alter table succeeded!");
+    } else if (operation instanceof DropTableOperation) {
+      callDDL(sql, context, "Table has been dropped.");
+    } else if (operation instanceof DescribeTableOperation) {
+      DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+      callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+    } else if (operation instanceof ShowTablesOperation) {
+      callShowTables(context);
+    } else if (operation instanceof CreateViewOperation) {
+      callDDL(sql, context, "View has been created.");
+    } else if (operation instanceof DropViewOperation) {
+      callDDL(sql, context, "View has been dropped.");
+    } else if (operation instanceof AlterViewOperation) {
+      callDDL(sql, context, "Alter view succeeded!");
+    } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been created.");
+    } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+      callDDL(sql, context, "Function has been removed.");
+    } else if (operation instanceof AlterCatalogFunctionOperation) {
+      callDDL(sql, context, "Alter function succeeded!");
+    } else if (operation instanceof ShowFunctionsOperation) {
+      callShowFunctions(context);
+    } else if (operation instanceof ShowModulesOperation) {
+      callShowModules(context);
+    } else if (operation instanceof ShowPartitionsOperation) {
+      ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+      callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+    } else {
+      throw new IOException(operation.getClass().getName() + " is not supported");
+    }
+  }
+
+
+  private void callHelp(InterpreterContext context) throws IOException {
+    context.out.write(MESSAGE_HELP.toString() + "\n");
+  }
+
+  private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
+    if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
+      List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+      modifyOperations.add(operation);
+      statementOperationsMap.put(context.getParagraphId(), modifyOperations);
+    } else {
+      callInserts(Collections.singletonList(operation), context);
+    }
+  }
+
+  private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+    if (!isBatch) {
+      context.getLocalProperties().put("flink.streaming.insert_into", "true");
+    }
+    TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+    checkState(tableResult.getJobClient().isPresent());
+    try {
+      tableResult.await();
+      JobClient jobClient = tableResult.getJobClient().get();
+      if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+        context.out.write("Insertion successfully.\n");
+      } else {
+        throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+      }
+    } catch (InterruptedException e) {
+      throw new IOException("Flink job is interrupted", e);
+    } catch (ExecutionException e) {
+      throw new IOException("Flink job is failed", e);
+    }
+  }
+
+  private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation);
+      String explanation =
+              Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+      context.out.write(explanation + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+      String explanation =
+              Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+      context.out.write(explanation + "\n");
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+    try {
+      lock.lock();
+      if (isBatch) {
+        callBatchInnerSelect(sql, context);
+      } else {
+        callStreamInnerSelect(sql, context);
+      }
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+  }
+
+  public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+    Table table = this.tbenv.sqlQuery(sql);
+    String result = z.showData(table);
+    context.out.write(result);
+  }
+
+  public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+    flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+  }
+
+  public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+    if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+      // set a property
+      String key = setOperation.getKey().get().trim();
+      String value = setOperation.getValue().get().trim();
+      this.tbenv.getConfig().getConfiguration().setString(key, value);
+      LOGGER.info("Set table config: {}={}", key, value);
+    } else {
+      // show all properties
+      final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+      List<String> prettyEntries = new ArrayList<>();
+      for (String key : properties.keySet()) {
+        prettyEntries.add(
+                String.format(
+                        "'%s' = '%s'",
+                        EncodingUtils.escapeSingleQuotes(key),
+                        EncodingUtils.escapeSingleQuotes(properties.get(key))));
+      }
+      prettyEntries.sort(String::compareTo);
+      prettyEntries.forEach(entry -> {
+        try {
+          context.out.write(entry + "\n");
+        } catch (IOException e) {
+          LOGGER.warn("Fail to write output", e);
+        }
+      });
+    }
+  }
+
+  private void callBeginStatementSet(InterpreterContext context) throws IOException {
+    statementModeMap.put(context.getParagraphId(), true);
+  }
+
+  private void callEndStatementSet(InterpreterContext context) throws IOException {
+    List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+    if (modifyOperations != null && !modifyOperations.isEmpty()) {
+      callInserts(modifyOperations, context);
+    } else {
+      context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+    }
+    statementModeMap.remove(context.getParagraphId());
+  }
+
+  private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+    tbenv.executeSql("USE CATALOG `" + catalog + "`");
+  }
+
+  private void callUseDatabase(String databaseName,
+                               InterpreterContext context) throws IOException {
+    this.tbenv.executeSql("USE `" + databaseName + "`");
+  }
+
+  private void callShowCatalogs(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+    List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+  }
+
+  private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+    String catalog = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current catalog: " + catalog + "\n");
+  }
+
+  private void callShowDatabases(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+    List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+  }
+
+  private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+    String database = tableResult.collect().next().getField(0).toString();
+    context.out.write("%text current database: " + database + "\n");
+  }
+
+  private void callShowTables(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+    List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+  }
+
+  private void callShowFunctions(InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+    List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+  }
+
+  private void callShowModules(InterpreterContext context) throws IOException {
+    String[] modules = this.tbenv.listModules();
+    context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+  }
+
+  private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+    TableResult tableResult = this.tbenv.executeSql(sql);
+    List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+            .map(r -> checkNotNull(r.getField(0)).toString())
+            .collect(Collectors.toList());
+    context.out.write(
+            "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+  }
+
+  private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+    try {
+      lock.lock();
+      this.tbenv.executeSql(sql);
+    } finally {
+      if (lock.isHeldByCurrentThread()) {
+        lock.unlock();
+      }
+    }
+    context.out.write(message + "\n");
+  }
+
+  private void callDescribe(String name, InterpreterContext context) throws IOException {
+    TableResult tableResult = null;
+    try {
+      tableResult = tbenv.executeSql("DESCRIBE " + name);
+    } catch (Exception e) {
+      throw new IOException("Fail to describe table: " + name, e);
+    }
+    CloseableIterator<Row> result = tableResult.collect();
+    StringBuilder builder = new StringBuilder();
+    builder.append("Column\tType\n");
+    while (result.hasNext()) {
+      Row row = result.next();
+      builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+    }
+    context.out.write("%table\n" + builder.toString());
+  }
+}