You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2022/03/19 00:42:02 UTC
[zeppelin] branch master updated: [ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 9d4561b [ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)
9d4561b is described below
commit 9d4561ba331bba6d8959775691796621faaf72bd
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Mar 19 08:41:52 2022 +0800
[ZEPPELIN-5659] Refactor FlinkSqlInterpreter (#4304)
* [ZEPPELIN-5659] Refactor FlinkSqlInterpreter
* minor update
* revert changes in zeppelin-server/pom.xml
---
flink/flink-scala-parent/pom.xml | 4 +-
.../zeppelin/flink/FlinkBatchSqlInterpreter.java | 44 +-
.../apache/zeppelin/flink/FlinkInterpreter.java | 6 +-
.../apache/zeppelin/flink/FlinkSqlInterpreter.java | 497 +----------------
.../zeppelin/flink/FlinkStreamSqlInterpreter.java | 61 ++-
.../flink/FlinkBatchSqlInterpreterTest.java | 119 ++---
.../zeppelin/flink/FlinkInterpreterTest.java | 55 +-
...reterTest.java => FlinkSqlInterpreterTest.java} | 86 +--
.../flink/FlinkStreamSqlInterpreterTest.java | 145 +++--
.../src/test/resources/init_stream.scala | 9 +-
.../java/org/apache/zeppelin/flink/FlinkShims.java | 44 +-
.../org/apache/zeppelin/flink/FlinkSqlContext.java | 73 +++
.../zeppelin/flink/sql/SqlCommandParser.java | 250 ---------
.../org/apache/zeppelin/flink/Flink112Shims.java | 306 +----------
.../zeppelin/flink/Flink112SqlInterpreter.java | 584 ++++++++++++++++++++
.../zeppelin/flink/shims112/SqlCommandParser.java | 355 +++++++++++++
.../org/apache/zeppelin/flink/Flink113Shims.java | 301 +----------
.../zeppelin/flink/Flink113SqlInterpreter.java | 559 ++++++++++++++++++++
flink/flink1.14-shims/pom.xml | 7 +
.../org/apache/zeppelin/flink/Flink114Shims.java | 300 +----------
.../zeppelin/flink/Flink114SqlInterpreter.java | 587 +++++++++++++++++++++
21 files changed, 2538 insertions(+), 1854 deletions(-)
diff --git a/flink/flink-scala-parent/pom.xml b/flink/flink-scala-parent/pom.xml
index 83a7ec7..efebced 100644
--- a/flink/flink-scala-parent/pom.xml
+++ b/flink/flink-scala-parent/pom.xml
@@ -560,7 +560,7 @@
<dependency>
<groupId>net.jodah</groupId>
<artifactId>concurrentunit</artifactId>
- <version>0.4.4</version>
+ <version>0.4.6</version>
<scope>test</scope>
</dependency>
@@ -791,7 +791,7 @@
<!-- set sun.zip.disableMemoryMapping=true because of
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
- <argLine>-Xmx5120m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+ <argLine>-Xmx5120m -XX:MaxMetaspaceSize=1024m -Dsun.zip.disableMemoryMapping=true</argLine>
<!-- <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true -agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6006</argLine>-->
<environmentVariables>
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
index d10a9ea..dea35bf 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreter.java
@@ -17,58 +17,36 @@
package org.apache.zeppelin.flink;
-import org.apache.flink.table.api.Table;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
-import java.io.IOException;
import java.util.Properties;
public class FlinkBatchSqlInterpreter extends FlinkSqlInterpreter {
- private ZeppelinContext z;
-
public FlinkBatchSqlInterpreter(Properties properties) {
super(properties);
}
@Override
- protected boolean isBatch() {
- return true;
- }
-
- @Override
public void open() throws InterpreterException {
- this.flinkInterpreter =
- getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
- this.tbenv = flinkInterpreter.getJavaBatchTableEnvironment("blink");
- this.z = flinkInterpreter.getZeppelinContext();
super.open();
+ FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
+ flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
+ flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
+ flinkInterpreter.getJavaBatchTableEnvironment("blink"),
+ flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+ flinkInterpreter.getZeppelinContext(),
+ null);
+ flinkInterpreter.getFlinkShims().initInnerBatchSqlInterpreter(flinkSqlContext);
}
@Override
- public void close() throws InterpreterException {
-
- }
-
- @Override
- public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
- Table table = this.tbenv.sqlQuery(sql);
- String result = z.showData(table);
- context.out.write(result);
- }
-
- @Override
- public void cancel(InterpreterContext context) throws InterpreterException {
- flinkInterpreter.cancel(context);
- }
-
- @Override
- public FormType getFormType() throws InterpreterException {
- return FormType.SIMPLE;
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ return flinkShims.runSqlList(st, context, true);
}
@Override
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index d629ba4..7e047cc 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -22,11 +22,7 @@ import org.apache.flink.api.scala.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableEnvironment;
-import org.apache.zeppelin.interpreter.Interpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
index c3ec4cd..8c55765 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterpreter.java
@@ -18,81 +18,30 @@
package org.apache.zeppelin.flink;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigOption;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.core.execution.JobListener;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.interpreter.AbstractInterpreter;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.ZeppelinContext;
-import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.apache.zeppelin.interpreter.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreter.class);
protected FlinkInterpreter flinkInterpreter;
- protected TableEnvironment tbenv;
- private SqlCommandParser sqlCommandParser;
- private SqlSplitter sqlSplitter;
- private int defaultSqlParallelism;
- private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
- // all the available sql config options. see
- // https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/
- private Map<String, ConfigOption> tableConfigOptions;
+ protected FlinkShims flinkShims;
+ protected ZeppelinContext z;
+
public FlinkSqlInterpreter(Properties properties) {
super(properties);
}
- protected abstract boolean isBatch();
-
@Override
public void open() throws InterpreterException {
- this.sqlCommandParser = new SqlCommandParser(flinkInterpreter.getFlinkShims(), tbenv);
- this.sqlSplitter = new SqlSplitter();
- JobListener jobListener = new JobListener() {
- @Override
- public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- LOGGER.info("UnLock JobSubmitLock");
- }
- }
-
- @Override
- public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
-
- }
- };
-
- flinkInterpreter.getExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
- flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(jobListener);
- this.defaultSqlParallelism = flinkInterpreter.getDefaultSqlParallelism();
- this.tableConfigOptions = flinkInterpreter.getFlinkShims().extractTableConfigOptions();
+ this.flinkInterpreter =
+ getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
+ this.flinkShims = flinkInterpreter.getFlinkShims();
}
@Override
@@ -121,435 +70,19 @@ public abstract class FlinkSqlInterpreter extends AbstractInterpreter {
}
}
- private InterpreterResult runSqlList(String st, InterpreterContext context) {
-
- try {
- boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
- List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
- boolean isFirstInsert = true;
- boolean hasInsert = false;
- for (String sql : sqls) {
- Optional<SqlCommandParser.SqlCommandCall> sqlCommand = sqlCommandParser.parse(sql);
- if (!sqlCommand.isPresent()) {
- try {
- context.out.write("%text Invalid Sql statement: " + sql + "\n");
- context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
- } catch (IOException e) {
- return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
- }
- return new InterpreterResult(InterpreterResult.Code.ERROR);
- }
- try {
- if (sqlCommand.get().command == SqlCommand.INSERT_INTO ||
- sqlCommand.get().command == SqlCommand.INSERT_OVERWRITE) {
- hasInsert = true;
- if (isFirstInsert && runAsOne) {
- flinkInterpreter.getFlinkShims().startMultipleInsert(tbenv, context);
- isFirstInsert = false;
- }
- }
- callCommand(sqlCommand.get(), context);
- context.out.flush();
- } catch (Throwable e) {
- LOGGER.error("Fail to run sql:" + sql, e);
- try {
- context.out.write("%text Fail to run sql command: " +
- sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
- } catch (IOException ex) {
- LOGGER.warn("Unexpected exception:", ex);
- return new InterpreterResult(InterpreterResult.Code.ERROR,
- ExceptionUtils.getStackTrace(e));
- }
- return new InterpreterResult(InterpreterResult.Code.ERROR);
- }
- }
-
- if (runAsOne && hasInsert) {
- try {
- lock.lock();
- String jobName = context.getStringLocalProperty("jobName", st);
- if (flinkInterpreter.getFlinkShims().executeMultipleInsertInto(jobName, this.tbenv, context)) {
- context.out.write("Insertion successfully.\n");
- }
- } catch (Exception e) {
- LOGGER.error("Fail to execute sql as one job", e);
- return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- }
- } catch(Exception e) {
- LOGGER.error("Fail to execute sql", e);
- return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
- } finally {
- // reset parallelism
- this.tbenv.getConfig().getConfiguration()
- .set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
- defaultSqlParallelism);
- // reset table config
- for (ConfigOption configOption: tableConfigOptions.values()) {
- // some may has no default value, e.g. ExecutionConfigOptions#TABLE_EXEC_DISABLED_OPERATORS
- if (configOption.defaultValue() != null) {
- this.tbenv.getConfig().getConfiguration().set(configOption, configOption.defaultValue());
- }
- }
- this.tbenv.getConfig().getConfiguration().addAll(flinkInterpreter.getFlinkConfiguration());
- }
-
- return new InterpreterResult(InterpreterResult.Code.SUCCESS);
- }
-
- private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
- InterpreterContext context) throws Exception {
- switch (cmdCall.command) {
- case HELP:
- callHelp(context);
- break;
- case SHOW_CATALOGS:
- callShowCatalogs(context);
- break;
- case SHOW_DATABASES:
- callShowDatabases(context);
- break;
- case SHOW_TABLES:
- callShowTables(context);
- break;
- case SOURCE:
- callSource(cmdCall.operands[0], context);
- break;
- case CREATE_FUNCTION:
- callCreateFunction(cmdCall.operands[0], context);
- break;
- case DROP_FUNCTION:
- callDropFunction(cmdCall.operands[0], context);
- break;
- case ALTER_FUNCTION:
- callAlterFunction(cmdCall.operands[0], context);
- break;
- case SHOW_FUNCTIONS:
- callShowFunctions(context);
- break;
- case SHOW_MODULES:
- callShowModules(context);
- break;
- case USE_CATALOG:
- callUseCatalog(cmdCall.operands[0], context);
- break;
- case USE:
- callUseDatabase(cmdCall.operands[0], context);
- break;
- case CREATE_CATALOG:
- callCreateCatalog(cmdCall.operands[0], context);
- break;
- case DROP_CATALOG:
- callDropCatalog(cmdCall.operands[0], context);
- break;
- case DESC:
- case DESCRIBE:
- callDescribe(cmdCall.operands[0], context);
- break;
- case EXPLAIN:
- callExplain(cmdCall.operands[0], context);
- break;
- case SELECT:
- callSelect(cmdCall.operands[0], context);
- break;
- case SET:
- callSet(cmdCall.operands[0], cmdCall.operands[1], context);
- break;
- case INSERT_INTO:
- case INSERT_OVERWRITE:
- callInsertInto(cmdCall.operands[0], context);
- break;
- case CREATE_TABLE:
- callCreateTable(cmdCall.operands[0], context);
- break;
- case DROP_TABLE:
- callDropTable(cmdCall.operands[0], context);
- break;
- case CREATE_VIEW:
- callCreateView(cmdCall, context);
- break;
- case DROP_VIEW:
- callDropView(cmdCall, context);
- break;
- case CREATE_DATABASE:
- callCreateDatabase(cmdCall.operands[0], context);
- break;
- case DROP_DATABASE:
- callDropDatabase(cmdCall.operands[0], context);
- break;
- case ALTER_DATABASE:
- callAlterDatabase(cmdCall.operands[0], context);
- break;
- case ALTER_TABLE:
- callAlterTable(cmdCall.operands[0], context);
- break;
- default:
- throw new Exception("Unsupported command: " + cmdCall.command);
- }
- }
-
- private void callAlterTable(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Table has been modified.\n");
- }
-
- private void callAlterDatabase(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Database has been modified.\n");
- }
-
- private void callDropDatabase(String sql, InterpreterContext context) throws IOException {
- try {
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Database has been dropped.\n");
- }
-
- private void callCreateDatabase(String sql, InterpreterContext context) throws IOException {
- try {
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Database has been created.\n");
- }
-
- private void callDropView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("View has been dropped.\n");
- }
-
- private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("View has been created.\n");
- }
-
- private void callCreateTable(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Table has been created.\n");
- }
-
- private void callDropTable(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- this.tbenv.sqlUpdate(sql);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- context.out.write("Table has been dropped.\n");
- }
+ public abstract InterpreterResult runSqlList(String st, InterpreterContext context);
- private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
- this.tbenv.useCatalog(catalog);
- }
-
- private void callCreateCatalog(String sql, InterpreterContext context) throws IOException {
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
- context.out.write("Catalog has been created.\n");
- }
-
- private void callDropCatalog(String sql, InterpreterContext context) throws IOException {
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
- context.out.write("Catalog has been dropped.\n");
- }
-
- private void callShowModules(InterpreterContext context) throws IOException {
- String[] modules = this.tbenv.listModules();
- context.out.write("%table module\n" + StringUtils.join(modules, "\n") + "\n");
- }
-
- private void callHelp(InterpreterContext context) throws IOException {
- context.out.write(flinkInterpreter.getFlinkShims().sqlHelp());
- }
-
- private void callShowCatalogs(InterpreterContext context) throws IOException {
- String[] catalogs = this.tbenv.listCatalogs();
- context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
- }
-
- private void callShowDatabases(InterpreterContext context) throws IOException {
- String[] databases = this.tbenv.listDatabases();
- context.out.write(
- "%table database\n" + StringUtils.join(databases, "\n") + "\n");
- }
-
- private void callShowTables(InterpreterContext context) throws IOException {
- List<String> tables =
- Arrays.asList(this.tbenv.listTables()).stream()
- .filter(tbl -> !tbl.startsWith("UnnamedTable")).collect(Collectors.toList());
- context.out.write(
- "%table table\n" + StringUtils.join(tables, "\n") + "\n");
- }
-
- private void callSource(String sqlFile, InterpreterContext context) throws IOException {
- String sql = IOUtils.toString(new FileInputStream(sqlFile));
- runSqlList(sql, context);
- }
-
- private void callCreateFunction(String sql, InterpreterContext context) throws IOException {
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
- context.out.write("Function has been created.\n");
- }
-
- private void callDropFunction(String sql, InterpreterContext context) throws IOException {
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
- context.out.write("Function has been dropped.\n");
- }
-
- private void callAlterFunction(String sql, InterpreterContext context) throws IOException {
- flinkInterpreter.getFlinkShims().executeSql(tbenv, sql);
- context.out.write("Function has been modified.\n");
- }
-
- private void callShowFunctions(InterpreterContext context) throws IOException {
- String[] functions = this.tbenv.listUserDefinedFunctions();
- context.out.write(
- "%table function\n" + StringUtils.join(functions, "\n") + "\n");
- }
-
- private void callUseDatabase(String databaseName,
- InterpreterContext context) throws IOException {
- tbenv.useDatabase(databaseName);
- }
-
- private void callDescribe(String name, InterpreterContext context) throws IOException {
- TableSchema schema = tbenv.scan(name.split("\\.")).getSchema();
- StringBuilder builder = new StringBuilder();
- builder.append("Column\tType\n");
- for (int i = 0; i < schema.getFieldCount(); ++i) {
- builder.append(schema.getFieldName(i).get() + "\t" + schema.getFieldDataType(i).get() + "\n");
- }
- context.out.write("%table\n" + builder.toString());
- }
-
- private void callExplain(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- context.out.write(this.flinkInterpreter.getFlinkShims().explain(tbenv, sql) + "\n");
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- }
-
- public void callSelect(String sql, InterpreterContext context) throws IOException {
- try {
- lock.lock();
- callInnerSelect(sql, context);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
- }
-
- public abstract void callInnerSelect(String sql, InterpreterContext context) throws IOException;
-
- private String removeSingleQuote(String value) {
- value = value.trim();
- if (value.startsWith("'")) {
- value = value.substring(1);
- }
- if (value.endsWith("'")) {
- value = value.substring(0, value.length() - 1);
- }
- return value;
- }
-
- public void callSet(String key, String value, InterpreterContext context) throws Exception {
- key = removeSingleQuote(key);
- value = removeSingleQuote(value);
-
- if ("execution.runtime-mode".equals(key)) {
- throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, " +
- "you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode");
- }
-
- if (!tableConfigOptions.containsKey(key)) {
- throw new IOException(key + " is not a valid table/sql config, please check link: " +
- "https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/config/");
- }
-
- LOGGER.info("Set table config: {}={}", key, value);
- this.tbenv.getConfig().getConfiguration().setString(key, value);
+ @Override
+ public void cancel(InterpreterContext context) throws InterpreterException {
+ flinkInterpreter.cancel(context);
}
- public void callInsertInto(String sql,
- InterpreterContext context) throws IOException {
- if (!isBatch()) {
- context.getLocalProperties().put("flink.streaming.insert_into", "true");
- }
- try {
- lock.lock();
- boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
- if (!runAsOne) {
- this.tbenv.sqlUpdate(sql);
- String jobName = context.getStringLocalProperty("jobName", sql);
- this.tbenv.execute(jobName);
- context.out.write("Insertion successfully.\n");
- } else {
- flinkInterpreter.getFlinkShims().addInsertStatement(sql, this.tbenv, context);
- }
- } catch (Exception e) {
- throw new IOException(e);
- } finally {
- if (lock.isHeldByCurrentThread()) {
- lock.unlock();
- }
- }
+ @Override
+ public FormType getFormType() throws InterpreterException {
+ return FormType.SIMPLE;
}
@Override
- public void cancel(InterpreterContext context) throws InterpreterException {
- this.flinkInterpreter.cancel(context);
+ public void close() throws InterpreterException {
}
-
}
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index 23aadf2..6bffb23 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -21,9 +21,9 @@ package org.apache.zeppelin.flink;
import org.apache.zeppelin.flink.sql.AppendStreamSqlJob;
import org.apache.zeppelin.flink.sql.SingleRowStreamSqlJob;
import org.apache.zeppelin.flink.sql.UpdateStreamSqlJob;
-import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
@@ -37,35 +37,35 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
}
@Override
- protected boolean isBatch() {
- return false;
- }
-
- @Override
public void open() throws InterpreterException {
- this.flinkInterpreter =
- getInterpreterInTheSameSessionByClassName(FlinkInterpreter.class);
- this.tbenv = flinkInterpreter.getJavaStreamTableEnvironment("blink");
super.open();
- }
-
- @Override
- public void close() throws InterpreterException {
+ FlinkSqlContext flinkSqlContext = new FlinkSqlContext(
+ flinkInterpreter.getExecutionEnvironment().getJavaEnv(),
+ flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv(),
+ flinkInterpreter.getJavaBatchTableEnvironment("blink"),
+ flinkInterpreter.getJavaStreamTableEnvironment("blink"),
+ flinkInterpreter.getZeppelinContext(),
+ sql -> callInnerSelect(sql));
+ flinkInterpreter.getFlinkShims().initInnerStreamSqlInterpreter(flinkSqlContext);
}
- @Override
- public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
+ public void callInnerSelect(String sql) {
+ InterpreterContext context = InterpreterContext.get();
String streamType = context.getLocalProperties().getOrDefault("type", "update");
if (streamType.equalsIgnoreCase("single")) {
SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
- tbenv,
+ flinkInterpreter.getJavaStreamTableEnvironment("blink"),
flinkInterpreter.getJobManager(),
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- streamJob.run(sql);
+ try {
+ streamJob.run(sql);
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to run single type stream job", e);
+ }
} else if (streamType.equalsIgnoreCase("append")) {
AppendStreamSqlJob streamJob = new AppendStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
@@ -74,7 +74,11 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- streamJob.run(sql);
+ try {
+ streamJob.run(sql);
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to run append type stream job", e);
+ }
} else if (streamType.equalsIgnoreCase("update")) {
UpdateStreamSqlJob streamJob = new UpdateStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
@@ -83,24 +87,19 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterpreter {
context,
flinkInterpreter.getDefaultParallelism(),
flinkInterpreter.getFlinkShims());
- streamJob.run(sql);
+ try {
+ streamJob.run(sql);
+ } catch (IOException e) {
+ throw new RuntimeException("Fail to run update type stream job", e);
+ }
} else {
- throw new IOException("Unrecognized stream type: " + streamType);
+ throw new RuntimeException("Unrecognized stream type: " + streamType);
}
}
@Override
- public void callInsertInto(String sql, InterpreterContext context) throws IOException {
- super.callInsertInto(sql, context);
- }
-
- public void cancel(InterpreterContext context) throws InterpreterException {
- this.flinkInterpreter.cancel(context);
- }
-
- @Override
- public Interpreter.FormType getFormType() throws InterpreterException {
- return Interpreter.FormType.SIMPLE;
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ return flinkShims.runSqlList(st, context, false);
}
@Override
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index e3526eb..41b3014 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -37,7 +37,7 @@ import java.util.Properties;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
-public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
+public class FlinkBatchSqlInterpreterTest extends FlinkSqlInterpreterTest {
@Override
protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
@@ -201,68 +201,46 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
context = getInterpreterContext();
result = sqlInterpreter.interpret(
"insert into sink_table select * from source_table", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
+ assertEquals(context.out.toString(), InterpreterResult.Code.ERROR, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertTrue(resultMessages.get(0).getData(),
resultMessages.get(0).getData().contains("already exists"));
- // insert overwrite into
- // context = getInterpreterContext();
- // result = sqlInterpreter.interpret(
- // "insert overwrite dest_table select id + 1, name from source_table", context);
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // resultMessages = context.out.toInterpreterResultMessage();
- // assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
- //
- // // verify insert into via select from the dest_table
- // context = getInterpreterContext();
- // result = sqlInterpreter.interpret(
- // "select * from dest_table", context);
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // resultMessages = context.out.toInterpreterResultMessage();
- // assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
- //
- // // define scala udf
- // result = flinkInterpreter.interpret(
- // "class AddOne extends ScalarFunction {\n" +
- // " def eval(a: Int): Int = a + 1\n" +
- // "}", getInterpreterContext());
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- //
- // result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
- // getInterpreterContext());
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- //
- // // insert into dest_table2 using udf
- // destDir = Files.createTempDirectory("flink_test").toFile();
- // FileUtils.deleteDirectory(destDir);
- // result = sqlInterpreter.interpret(
- // "CREATE TABLE dest_table2 (\n" +
- // "id int,\n" +
- // "name string" +
- // ") WITH (\n" +
- // "'format.field-delimiter'=',',\n" +
- // "'connector.type'='filesystem',\n" +
- // "'format.derive-schema'='true',\n" +
- // "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
- // "'format.type'='csv'\n" +
- // ");", getInterpreterContext());
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- //
- // context = getInterpreterContext();
- // result = sqlInterpreter.interpret(
- // "insert into dest_table2 select addOne(id), name from source_table", context);
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // resultMessages = context.out.toInterpreterResultMessage();
- // assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
- //
- // // verify insert into via select from the dest table
- // context = getInterpreterContext();
- // result = sqlInterpreter.interpret(
- // "select * from dest_table2", context);
- // assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- // resultMessages = context.out.toInterpreterResultMessage();
- // assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
+ // insert into again will succeed after destDir is deleted
+ destDir.delete();
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "insert into sink_table select * from source_table", context);
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+
+ // define scala udf
+ result = flinkInterpreter.interpret(
+ "class AddOne extends ScalarFunction {\n" +
+ " def eval(a: Int): Int = a + 1\n" +
+ "}", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ result = flinkInterpreter.interpret("btenv.registerFunction(\"addOne\", new AddOne())",
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // insert into sink_table again using udf
+ destDir.delete();
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "insert into sink_table select addOne(id), name from source_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
+
+ // verify insert into via select from the dest table
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "select * from sink_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals("id\tname\n2\ta\n3\tb\n", resultMessages.get(0).getData());
}
@Test
@@ -293,8 +271,8 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
- assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
- sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
+ assertEquals(10,
+ flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM).intValue());
// set then insert into
destDir.delete();
@@ -305,21 +283,10 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals("Insertion successfully.\n", resultMessages.get(0).getData());
- assertEquals(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.defaultValue(),
- sqlInterpreter.tbenv.getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM));
- assertEquals(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED.defaultValue(),
- sqlInterpreter.tbenv.getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED));
-
- // invalid config
- destDir.delete();
- context = getInterpreterContext();
- result = sqlInterpreter.interpret(
- "set table.invalid_config=false;" +
- "insert into sink_table select * from source_table", context);
- assertEquals(InterpreterResult.Code.ERROR, result.code());
- resultMessages = context.out.toInterpreterResultMessage();
- assertTrue(resultMessages.get(0).getData(),
- resultMessages.get(0).getData().contains("table.invalid_config is not a valid table/sql config"));
+ assertEquals(10,
+ flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM).intValue());
+ assertEquals(false,
+ flinkInterpreter.getBatchTableEnvironment().getConfig().getConfiguration().get(OptimizerConfigOptions.TABLE_OPTIMIZER_SOURCE_PREDICATE_PUSHDOWN_ENABLED).booleanValue());
}
@Test
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index c3939db..b029304 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -19,17 +19,11 @@ package org.apache.zeppelin.flink;
import junit.framework.TestCase;
import net.jodah.concurrentunit.Waiter;
-import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.ui.CheckBox;
import org.apache.zeppelin.display.ui.Select;
import org.apache.zeppelin.display.ui.TextBox;
-import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.apache.zeppelin.interpreter.InterpreterException;
-import org.apache.zeppelin.interpreter.InterpreterGroup;
-import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterResult;
-import org.apache.zeppelin.interpreter.InterpreterResultMessage;
+import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.junit.After;
@@ -40,14 +34,13 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeoutException;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.mock;
@@ -346,28 +339,34 @@ public class FlinkInterpreterTest {
resultMessages.get(0).getData().contains("url\tpv\n"));
}
- // TODO(zjffdu) flaky test
- // @Test
- public void testResumeStreamSqlFromSavePoint() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+ @Test
+ public void testResumeStreamSqlFromSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+ if (!interpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+ LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+ // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+ return;
+ }
+
String initStreamScalaScript = FlinkStreamSqlInterpreterTest.getInitStreamScript(1000);
InterpreterResult result = interpreter.interpret(initStreamScalaScript,
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- File savePointDir = FileUtils.getTempDirectory();
+ File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
final Waiter waiter = new Waiter();
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
InterpreterResult result2 = interpreter.interpret(
"val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
"log group by url\")\nz.show(table, streamType=\"update\")", context);
- System.out.println("------------" + context.out.toString());
- System.out.println("------------" + result2);
+ LOGGER.info("------------" + context.out.toString());
+ LOGGER.info("------------" + result2);
waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+ // Flink job is succeed when it is cancelled with save point.
waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
} catch (Exception e) {
e.printStackTrace();
@@ -378,16 +377,25 @@ public class FlinkInterpreterTest {
thread.start();
// the streaming job will run for 60 seconds. check init_stream.scala
- // sleep 20 seconds to make sure the job is started but not finished
- Thread.sleep(20 * 1000);
+ // sleep 30 seconds to make sure the job is started but not finished
+ Thread.sleep(30 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
interpreter.cancel(context);
- waiter.await(20 * 1000);
+ waiter.await(30 * 1000);
+
+ // verify save point is generated
+ String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
+ TestCase.assertTrue(allSavepointPath.length > 0);
+
// resume job from savepoint
+ context = getInterpreterContext();
+ context.getLocalProperties().put(JobManager.SAVEPOINT_PATH, allSavepointPath[0]);
+ context.getLocalProperties().put("parallelism", "2");
+ context.getLocalProperties().put("maxParallelism", "10");
interpreter.interpret(
"val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
"log group by url\")\nz.show(table, streamType=\"update\")", context);
@@ -396,6 +404,11 @@ public class FlinkInterpreterTest {
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
TestCase.assertTrue(resultMessages.toString(),
resultMessages.get(0).getData().contains("url\tpv\n"));
+ assertEquals(resultMessages.toString(), "url\tpv\n" +
+ "home\t10\n" +
+ "product\t30\n" +
+ "search\t20\n",
+ resultMessages.get(0).getData());
}
private InterpreterContext getInterpreterContext() {
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
similarity index 91%
rename from flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
rename to flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
index 12862d2..2673ff0 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkSqlInterpreterTest.java
@@ -69,9 +69,9 @@ import static org.mockito.Mockito.mock;
@RunWith(FlinkStandaloneHiveRunner.class)
-public abstract class SqlInterpreterTest {
+public abstract class FlinkSqlInterpreterTest {
- private static final Logger LOGGER = LoggerFactory.getLogger(SqlInterpreterTest.class);
+ protected static final Logger LOGGER = LoggerFactory.getLogger(FlinkSqlInterpreterTest.class);
protected FlinkInterpreter flinkInterpreter;
@@ -94,7 +94,7 @@ public abstract class SqlInterpreterTest {
p.setProperty("zeppelin.pyflink.useIPython", "false");
p.setProperty("local.number-taskmanager", "4");
p.setProperty("zeppelin.python.gatewayserver_address", "127.0.0.1");
-
+
File hiveConfDir = Files.createTempDir();
hiveShell.getHiveConf().writeXml(new FileWriter(new File(hiveConfDir, "hive-site.xml")));
p.setProperty("HIVE_CONF_DIR", hiveConfDir.getAbsolutePath());
@@ -176,6 +176,15 @@ public abstract class SqlInterpreterTest {
result = sqlInterpreter.interpret("use db1", context);
assertEquals(Code.SUCCESS, result.code());
+ // show current database
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("show current database", context);
+ assertEquals(Code.SUCCESS, result.code());
+ assertEquals(1, context.out.toInterpreterResultMessage().size());
+ assertEquals(Type.TEXT, context.out.toInterpreterResultMessage().get(0).getType());
+ assertEquals("current database: db1\n", context.out.toInterpreterResultMessage().get(0).getData());
+
+ // show tables
context = getInterpreterContext();
result = sqlInterpreter.interpret("show tables", context);
assertEquals(Code.SUCCESS, result.code());
@@ -183,10 +192,12 @@ public abstract class SqlInterpreterTest {
assertEquals(Type.TABLE, resultMessages.get(0).getType());
assertEquals("table\n", resultMessages.get(0).getData());
+ // create table
context = getInterpreterContext();
result = sqlInterpreter.interpret("CREATE TABLE source (msg INT) with ('connector'='print')", context);
assertEquals(Code.SUCCESS, result.code());
+ // show tables
context = getInterpreterContext();
result = sqlInterpreter.interpret("show tables", context);
assertEquals(Code.SUCCESS, result.code());
@@ -194,6 +205,7 @@ public abstract class SqlInterpreterTest {
assertEquals(Type.TABLE, resultMessages.get(0).getType());
assertEquals("table\nsource\n", resultMessages.get(0).getData());
+ // describe table
context = getInterpreterContext();
result = sqlInterpreter.interpret("describe db1.source", context);
assertEquals(Code.SUCCESS, result.code());
@@ -203,14 +215,17 @@ public abstract class SqlInterpreterTest {
"msg\tINT\n"
, resultMessages.get(0).getData());
+ // use database
context = getInterpreterContext();
- result = sqlInterpreter.interpret("use default", context);
- assertEquals(Code.SUCCESS, result.code());
+ result = sqlInterpreter.interpret("use `default`", context);
+ assertEquals(context.out.toString(), Code.SUCCESS, result.code());
+ // show tables
context = getInterpreterContext();
result = sqlInterpreter.interpret("show tables", context);
assertEquals(Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(1, resultMessages.size());
assertEquals(Type.TABLE, resultMessages.get(0).getType());
assertEquals("table\n", resultMessages.get(0).getData());
@@ -257,7 +272,7 @@ public abstract class SqlInterpreterTest {
// describe table
context = getInterpreterContext();
result = sqlInterpreter.interpret("describe source_table", context);
- assertEquals(Code.SUCCESS, result.code());
+ assertEquals(result.toString(), Code.SUCCESS, result.code());
assertEquals(1, resultMessages.size());
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(Type.TABLE, resultMessages.get(0).getType());
@@ -275,7 +290,7 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertTrue(resultMessages.toString(),
- resultMessages.get(0).getData().contains("Table `unknown_table` was not found."));
+ resultMessages.get(0).getData().contains("doesn't exist"));
// drop unknown table
context = getInterpreterContext();
@@ -301,7 +316,7 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertTrue(resultMessages.get(0).getData(),
- resultMessages.get(0).getData().contains("Table `source_table` was not found"));
+ resultMessages.get(0).getData().contains("doesn't exist"));
}
@Test
@@ -414,7 +429,6 @@ public abstract class SqlInterpreterTest {
@Test
public void testInvalidSql() throws InterpreterException, IOException {
-
InterpreterContext context = getInterpreterContext();
InterpreterResult result = sqlInterpreter.interpret("Invalid sql", context);
assertEquals(Code.ERROR, result.code());
@@ -422,12 +436,9 @@ public abstract class SqlInterpreterTest {
assertEquals(1, resultMessages.size());
assertEquals(Type.TEXT, resultMessages.get(0).getType());
assertTrue(resultMessages.get(0).getData(),
- resultMessages.get(0).getData().contains("Invalid Sql statement: Invalid sql"));
- assertTrue(resultMessages.get(0).getData(),
- resultMessages.get(0).getData().contains("The following commands are available"));
+ resultMessages.get(0).getData().contains("Invalid Sql statement"));
}
-
@Test
public void testFunction() throws IOException, InterpreterException {
InterpreterContext context = getInterpreterContext();
@@ -447,23 +458,20 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("myudf"));
-
// ALTER
context = getInterpreterContext();
result = sqlInterpreter.interpret(
"ALTER FUNCTION myUDF AS 'org.apache.zeppelin.flink.JavaLower' ; ", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
- assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been modified."));
-
+ assertEquals("Alter function succeeded!\n",resultMessages.get(0).getData());
// DROP UDF
context = getInterpreterContext();
result = sqlInterpreter.interpret("DROP FUNCTION myudf ;", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
- assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been dropped."));
-
+ assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Function has been removed."));
// SHOW UDF. Due to drop UDF before, it shouldn't contain 'myudf'
result = sqlInterpreter.interpret(
@@ -485,7 +493,7 @@ public abstract class SqlInterpreterTest {
");", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
- assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("Catalog has been created."));
+ assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("Catalog has been created."));
// USE CATALOG & SHOW DATABASES;
context = getInterpreterContext();
@@ -496,6 +504,13 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default"));
+ // SHOW CURRENT CATALOG
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("SHOW CURRENT CATALOG", context);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals("current catalog: test_catalog\n", resultMessages.get(0).getData());
+
// DROP CATALOG
context = getInterpreterContext();
result = sqlInterpreter.interpret(
@@ -510,37 +525,35 @@ public abstract class SqlInterpreterTest {
"SHOW CATALOGS ;\n", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
resultMessages = context.out.toInterpreterResultMessage();
- assertTrue(resultMessages.toString(),resultMessages.get(0).getData().contains("default_catalog"));
- assertFalse(resultMessages.toString(),resultMessages.get(0).getData().contains("test_catalog"));
+ assertTrue(context.out.toString(), resultMessages.get(0).getData().contains("default_catalog"));
+ assertFalse(context.out.toString(), resultMessages.get(0).getData().contains("test_catalog"));
}
@Test
public void testSetProperty() throws InterpreterException {
- FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+ // set time-zone with single quote
InterpreterContext context = getInterpreterContext();
- InterpreterResult result = sqlInterpreter.interpret(
- "set table.sql-dialect=hive", context);
+ InterpreterResult result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
- sqlInterpreter.interpret("create table test_hive_table(a string, b int)\n" +
- "partitioned by (dt string)", context);
+ // set table.sql-dialect without quote
+ result = sqlInterpreter.interpret(
+ "set table.sql-dialect=hive", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
- // table.local-time-zone is only available from 1.12
- if (flinkVersion.newerThanOrEqual(FlinkVersion.fromVersionString("1.12.0"))) {
- context = getInterpreterContext();
- result = sqlInterpreter.interpret("SET 'table.local-time-zone' = 'UTC'", context);
- assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
- }
+ // show all settings
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("SET", context);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+ assertTrue(context.out.toString(), context.out.toString().contains("'table.sql-dialect' = 'hive"));
+ assertTrue(context.out.toString(), context.out.toString().contains("'table.local-time-zone' = 'UTC"));
}
@Test
- public void testShowModules() throws InterpreterException, IOException {
- FlinkVersion flinkVersion = flinkInterpreter.getFlinkVersion();
+ public void testModules() throws InterpreterException, IOException {
InterpreterContext context = getInterpreterContext();
-
- // CREATE CATALOG
+ // show catalogs
InterpreterResult result = sqlInterpreter.interpret(
"show modules", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -548,7 +561,6 @@ public abstract class SqlInterpreterTest {
assertTrue(resultMessages.toString(), resultMessages.get(0).getData().contains("core"));
}
-
protected InterpreterContext getInterpreterContext() {
InterpreterContext context = InterpreterContext.builder()
.setParagraphId("paragraphId")
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 4b3ebf9..9894764 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -36,7 +36,7 @@ import java.util.concurrent.TimeoutException;
import static junit.framework.TestCase.assertTrue;
import static org.junit.Assert.assertEquals;
-public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
+public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
@Override
protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
@@ -70,7 +70,8 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = getInterpreterContext();
- String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\nz.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
+ String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\n" +
+ "z.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
result = flinkInterpreter.interpret(code, context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
@@ -160,6 +161,10 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
@Test
public void testCancelStreamSql() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+ // clean checkpoint dir first, checkpoint dir is defined in init_stream.scala
+ File checkpointDir = new File("/tmp/flink/checkpoints");
+ FileUtils.deleteDirectory(checkpointDir);
+
String initStreamScalaScript = getInitStreamScript(1000);
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
getInterpreterContext());
@@ -181,13 +186,17 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
});
thread.start();
- // the streaming job will run for 20 seconds. check init_stream.scala
- // sleep 10 seconds to make sure the job is started but not finished
- Thread.sleep(10 * 1000);
+ // the streaming job will run for 60 seconds. check init_stream.scala
+ // sleep 30 seconds to make sure the job is started but not finished
+ Thread.sleep(30 * 1000);
InterpreterContext context = getInterpreterContext();
sqlInterpreter.cancel(context);
- waiter.await(10 * 1000);
+ waiter.await(30 * 1000);
+
+ // verify checkpoints
+ assertTrue(checkpointDir.listFiles(f -> f.isDirectory()).length > 0);
+
// resume job
sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
@@ -198,117 +207,154 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
resultMessages.get(0).getData().contains("url\tpv\n"));
}
- // TODO(zjffdu) flaky test
- // @Test
- public void testResumeStreamSqlFromSavePoint() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+ @Test
+ public void testResumeStreamSqlFromSavePointDir() throws IOException, InterpreterException, InterruptedException, TimeoutException {
+ if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+ LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+ // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+ return;
+ }
+
String initStreamScalaScript = getInitStreamScript(1000);
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- File savePointDir = FileUtils.getTempDirectory();
+ File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
final Waiter waiter = new Waiter();
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
- System.out.println("------------" + context.out.toString());
- System.out.println("------------" + result2);
+ LOGGER.info("------------" + context.out.toString());
+ LOGGER.info("------------" + result2);
waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+ // Flink job is succeed when it is cancelled with save point.
waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("Should not throw exception", e);
waiter.fail("Should not fail here");
}
waiter.resume();
});
thread.start();
- // the streaming job will run for 20 seconds. check init_stream.scala
- // sleep 10 seconds to make sure the job is started but not finished
- Thread.sleep(10 * 1000);
+ // the streaming job will run for 60 seconds. check init_stream.scala
+ // sleep 30 seconds to make sure the job is started but not finished
+ Thread.sleep(30 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
sqlInterpreter.cancel(context);
- waiter.await(10 * 1000);
+ waiter.await(30 * 1000);
+
+ // verify save point is generated
+ String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
+ assertTrue(allSavepointPath.length > 0);
+
// resume job from savepoint
+ context = getInterpreterContext();
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
+ context.getLocalProperties().put("parallelism", "2");
+ context.getLocalProperties().put("maxParallelism", "10");
sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
- assertTrue(resultMessages.toString(),
- resultMessages.get(0).getData().contains("url\tpv\n"));
+ assertEquals(resultMessages.toString(), "url\tpv\n" +
+ "home\t10\n" +
+ "product\t30\n" +
+ "search\t20\n",
+ resultMessages.get(0).getData());
}
- // TODO(zjffdu) flaky test
- //@Test
+ @Test
public void testResumeStreamSqlFromExistSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
- String initStreamScalaScript = getInitStreamScript(2000);
+ if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+ LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+ // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+ return;
+ }
+
+ String initStreamScalaScript = getInitStreamScript(1000);
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- File savePointDir = FileUtils.getTempDirectory();
+ File savePointDir = Files.createTempDirectory("zeppelin-flink").toFile();
final Waiter waiter = new Waiter();
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
+ LOGGER.info("------------" + context.out.toString());
+ LOGGER.info("------------" + result2);
waiter.assertTrue(context.out.toString().contains("url\tpv\n"));
+ // Flink job is succeed when it is cancelled with save point.
waiter.assertEquals(InterpreterResult.Code.SUCCESS, result2.code());
} catch (Exception e) {
- e.printStackTrace();
+ LOGGER.error("Should not throw exception", e);
waiter.fail("Should not fail here");
}
waiter.resume();
});
thread.start();
- // the streaming job will run for 20 seconds. check init_stream.scala
- // sleep 10 seconds to make sure the job is started but not finished
- Thread.sleep(10 * 1000);
+ // the streaming job will run for 60 seconds. check init_stream.scala
+ // sleep 30 seconds to make sure the job is started but not finished
+ Thread.sleep(30 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
+ context.getLocalProperties().put(JobManager.SAVEPOINT_DIR, savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
sqlInterpreter.cancel(context);
- waiter.await(10 * 1000);
+ waiter.await(30 * 1000);
// get exist savepoint path from tempDirectory
// if dir more than 1 then get first or throw error
String[] allSavepointPath = savePointDir.list((dir, name) -> name.startsWith("savepoint"));
- assertTrue(allSavepointPath.length>0);
+ assertTrue(allSavepointPath.length > 0);
String savepointPath = savePointDir.getAbsolutePath().concat(File.separator).concat(allSavepointPath[0]);
// resume job from exist savepointPath
+ context = getInterpreterContext();
context.getConfig().put(JobManager.SAVEPOINT_PATH,savepointPath);
+ context.getLocalProperties().put("parallelism", "2");
+ context.getLocalProperties().put("maxParallelism", "10");
sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
- assertTrue(resultMessages.toString(),
- resultMessages.get(0).getData().contains("url\tpv\n"));
-
+ assertEquals(resultMessages.toString(), "url\tpv\n" +
+ "home\t10\n" +
+ "product\t30\n" +
+ "search\t20\n",
+ resultMessages.get(0).getData());
}
@Test
public void testResumeStreamSqlFromInvalidSavePointPath() throws IOException, InterpreterException, InterruptedException, TimeoutException {
- String initStreamScalaScript = getInitStreamScript(2000);
+ if (!flinkInterpreter.getFlinkShims().getFlinkVersion().isAfterFlink114()) {
+ LOGGER.info("Skip testResumeStreamSqlFromSavePointPath, because this test is only passed after Flink 1.14 due to FLINK-23654");
+ // By default, this thread pool in Flink JobManager is the number of cpu cores. While the cpu cores in github action container is too small
+ return;
+ }
+
+ String initStreamScalaScript = getInitStreamScript(1000);
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
getInterpreterContext());
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -343,10 +389,11 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
"log group by url", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
-// assertEquals(InterpreterResult.Type.TABLE,
-// updatedOutput.toInterpreterResultMessage().getType());
-// assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
-// !updatedOutput.toInterpreterResultMessage().getData().isEmpty());
+ assertTrue(context.out.toString(), !context.out.toInterpreterResultMessage().isEmpty());
+ assertEquals(InterpreterResult.Type.TABLE,
+ context.out.toInterpreterResultMessage().get(0).getType());
+ assertTrue(context.out.toInterpreterResultMessage().get(0).getData(),
+ !context.out.toInterpreterResultMessage().get(0).getData().isEmpty());
}
@Test
@@ -389,6 +436,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
@Test
public void testMultipleInsertInto() throws InterpreterException, IOException {
hiveShell.execute("create table source_table (id int, name string)");
+ hiveShell.execute("insert into source_table values(1, 'name')");
File destDir = Files.createTempDirectory("flink_test").toFile();
FileUtils.deleteDirectory(destDir);
@@ -426,16 +474,29 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
result = sqlInterpreter.interpret(
"insert into dest_table select * from source_table;insert into dest_table2 select * from source_table",
context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ // check dest_table
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
+ // check dest_table2
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
// runAsOne won't affect the select statement.
context = getInterpreterContext();
context.getLocalProperties().put("runAsOne", "true");
result = sqlInterpreter.interpret(
- "select 1",
+ "select 1 as a",
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("a\n1\n", context.out.toString());
}
@Test
diff --git a/flink/flink-scala-parent/src/test/resources/init_stream.scala b/flink/flink-scala-parent/src/test/resources/init_stream.scala
index f8d27ae..14993ea 100644
--- a/flink/flink-scala-parent/src/test/resources/init_stream.scala
+++ b/flink/flink-scala-parent/src/test/resources/init_stream.scala
@@ -4,9 +4,16 @@ import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed
import java.util.Collections
import scala.collection.JavaConversions._
+import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup
+import org.apache.flink.runtime.state.filesystem.FsStateBackend
+
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-senv.enableCheckpointing(5000)
+senv.enableCheckpointing(10000)
+senv.setStateBackend(new FsStateBackend("file:///tmp/flink/checkpoints"));
+
+val chkConfig = senv.getCheckpointConfig
+chkConfig.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
val data = senv.addSource(new SourceFunction[(Long, String)] with ListCheckpointed[java.lang.Long] {
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 640eba6..2a3d82e 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -19,11 +19,8 @@ package org.apache.zeppelin.flink;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,8 +28,6 @@ import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
/**
@@ -47,13 +42,15 @@ public abstract class FlinkShims {
protected Properties properties;
protected FlinkVersion flinkVersion;
+ protected FlinkSqlContext flinkSqlContext;
public FlinkShims(FlinkVersion flinkVersion, Properties properties) {
this.flinkVersion = flinkVersion;
this.properties = properties;
}
- private static FlinkShims loadShims(FlinkVersion flinkVersion, Properties properties)
+ private static FlinkShims loadShims(FlinkVersion flinkVersion,
+ Properties properties)
throws Exception {
Class<?> flinkShimsClass;
if (flinkVersion.getMajorVersion() == 1 && flinkVersion.getMinorVersion() == 12) {
@@ -87,21 +84,14 @@ public abstract class FlinkShims {
return flinkShims;
}
- protected static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) {
- return new AttributedStringBuilder()
- .style(AttributedStyle.DEFAULT.bold())
- .append(cmd.toString())
- .append("\t\t")
- .style(AttributedStyle.DEFAULT)
- .append(description)
- .append('\n')
- .toAttributedString();
- }
-
public FlinkVersion getFlinkVersion() {
return flinkVersion;
}
+ public abstract void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext);
+
+ public abstract void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext);
+
public abstract void disableSysoutLogging(Object batchConfig, Object streamConfig);
public abstract Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment);
@@ -116,12 +106,6 @@ public abstract class FlinkShims {
public abstract List collectToList(Object table) throws Exception;
- public abstract void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception;
-
- public abstract void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception;
-
- public abstract boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception;
-
public abstract boolean rowEquals(Object row1, Object row2);
public abstract Object fromDataSet(Object btenv, Object ds);
@@ -138,22 +122,12 @@ public abstract class FlinkShims {
public abstract void registerTableSink(Object stenv, String tableName, Object collectTableSink);
- public abstract Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt);
-
- public abstract void executeSql(Object tableEnv, String sql);
-
- public abstract String explain(Object tableEnv, String sql);
-
- public abstract String sqlHelp();
-
public abstract void setCatalogManagerSchemaResolver(Object catalogManager,
Object parser,
Object environmentSetting);
public abstract Object updateEffectiveConfig(Object cliFrontend, Object commandLine, Object executorConfig);
- public abstract Map extractTableConfigOptions();
-
public void setBatchRuntimeMode(Object tableConfig) {
// only needed after flink 1.13
}
@@ -169,4 +143,6 @@ public abstract class FlinkShims {
public abstract ImmutablePair<Object, Object> createPlannerAndExecutor(
ClassLoader classLoader, Object environmentSettings, Object sEnv,
Object tableConfig, Object functionCatalog, Object catalogManager);
+
+ public abstract InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch);
}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java
new file mode 100644
index 0000000..769a51f
--- /dev/null
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkSqlContext.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.zeppelin.flink;
+
+import java.util.function.Consumer;
+
+
+/**
+ * Context class which is used to pass context from flink-scala-x module to flink-shims module.
+ */
+public class FlinkSqlContext {
+
+ private Object benv;
+ private Object senv;
+ private Object btenv;
+ private Object stenv;
+ private Object z;
+ private Consumer<String> streamSqlSelectConsumer;
+
+ public FlinkSqlContext(Object benv,
+ Object senv,
+ Object btenv,
+ Object stenv,
+ Object z,
+ Consumer<String> streamSqlSelectConsumer) {
+ this.benv = benv;
+ this.senv = senv;
+ this.btenv = btenv;
+ this.stenv = stenv;
+ this.z = z;
+ this.streamSqlSelectConsumer = streamSqlSelectConsumer;
+ }
+
+ public Object getBenv() {
+ return benv;
+ }
+
+ public Object getSenv() {
+ return senv;
+ }
+
+ public Object getBtenv() {
+ return btenv;
+ }
+
+ public Object getStenv() {
+ return stenv;
+ }
+
+ public Object getZeppelinContext() {
+ return z;
+ }
+
+ public Consumer<String> getStreamSqlSelectConsumer() {
+ return streamSqlSelectConsumer;
+ }
+}
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
deleted file mode 100644
index 1a4f0e3..0000000
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.zeppelin.flink.sql;
-
-import org.apache.zeppelin.flink.FlinkShims;
-
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.regex.Pattern;
-
-/**
- * Simple parser for determining the type of command and its parameters.
- * All the SqlCommands should be put into this class, and the parsing logic needs to be put ito FlinkShims
- * because each version of flink has different sql syntax support.
- */
-public final class SqlCommandParser {
-
- private FlinkShims flinkShims;
- private Object tableEnv;
-
- public SqlCommandParser(FlinkShims flinkShims, Object tableEnv) {
- this.flinkShims = flinkShims;
- this.tableEnv = tableEnv;
- }
-
- public Optional<SqlCommandCall> parse(String stmt) {
- return flinkShims.parseSql(tableEnv, stmt);
- }
-
- // --------------------------------------------------------------------------------------------
-
- private static final Function<String[], Optional<String[]>> NO_OPERANDS =
- (operands) -> Optional.of(new String[0]);
-
- private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
- (operands) -> Optional.of(new String[]{operands[0]});
-
- private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
-
- /**
- * Supported SQL commands.
- */
- public enum SqlCommand {
- QUIT(
- "(QUIT|EXIT)",
- NO_OPERANDS),
-
- CLEAR(
- "CLEAR",
- NO_OPERANDS),
-
- HELP(
- "HELP",
- NO_OPERANDS),
-
- SHOW_CATALOGS(
- "SHOW\\s+CATALOGS",
- NO_OPERANDS),
-
- SHOW_DATABASES(
- "SHOW\\s+DATABASES",
- NO_OPERANDS),
-
- SHOW_TABLES(
- "SHOW\\s+TABLES",
- NO_OPERANDS),
-
- SHOW_FUNCTIONS(
- "SHOW\\s+FUNCTIONS",
- NO_OPERANDS),
-
- SHOW_MODULES(
- "SHOW\\s+MODULES",
- NO_OPERANDS),
-
- USE_CATALOG(
- "USE\\s+CATALOG\\s+(.*)",
- SINGLE_OPERAND),
-
- USE(
- "USE\\s+(?!CATALOG)(.*)",
- SINGLE_OPERAND),
-
- CREATE_CATALOG(null, SINGLE_OPERAND),
-
- DROP_CATALOG(null, SINGLE_OPERAND),
-
- DESC(
- "DESC\\s+(.*)",
- SINGLE_OPERAND),
-
- DESCRIBE(
- "DESCRIBE\\s+(.*)",
- SINGLE_OPERAND),
-
- EXPLAIN(
- "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
- (operands) -> {
- return Optional.of(new String[] { operands[0], operands[1] });
- }),
-
- CREATE_DATABASE(
- "(CREATE\\s+DATABASE\\s+.*)",
- SINGLE_OPERAND),
-
- DROP_DATABASE(
- "(DROP\\s+DATABASE\\s+.*)",
- SINGLE_OPERAND),
-
- ALTER_DATABASE(
- "(ALTER\\s+DATABASE\\s+.*)",
- SINGLE_OPERAND),
-
- CREATE_TABLE("(CREATE\\s+TABLE\\s+.*)", SINGLE_OPERAND),
-
- DROP_TABLE("(DROP\\s+TABLE\\s+.*)", SINGLE_OPERAND),
-
- ALTER_TABLE(
- "(ALTER\\s+TABLE\\s+.*)",
- SINGLE_OPERAND),
-
- DROP_VIEW(
- "DROP\\s+VIEW\\s+(.*)",
- SINGLE_OPERAND),
-
- CREATE_VIEW(
- "CREATE\\s+VIEW\\s+(\\S+)\\s+AS\\s+(.*)",
- (operands) -> {
- if (operands.length < 2) {
- return Optional.empty();
- }
- return Optional.of(new String[]{operands[0], operands[1]});
- }),
-
- CREATE_FUNCTION(null, SINGLE_OPERAND),
-
- DROP_FUNCTION(null, SINGLE_OPERAND),
-
- ALTER_FUNCTION(null, SINGLE_OPERAND),
-
- SELECT(
- "(SELECT.*)",
- SINGLE_OPERAND),
-
- INSERT_INTO(
- "(INSERT\\s+INTO.*)",
- SINGLE_OPERAND),
-
- INSERT_OVERWRITE(
- "(INSERT\\s+OVERWRITE.*)",
- SINGLE_OPERAND),
-
- SET(
- "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
- (operands) -> {
- if (operands.length < 3) {
- return Optional.empty();
- } else if (operands[0] == null) {
- return Optional.of(new String[0]);
- }
- return Optional.of(new String[]{operands[1], operands[2]});
- }),
-
- RESET(
- "RESET",
- NO_OPERANDS),
-
- SOURCE(
- "SOURCE\\s+(.*)",
- SINGLE_OPERAND);
-
- public final Pattern pattern;
- public final Function<String[], Optional<String[]>> operandConverter;
-
- SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
- if (matchingRegex == null) {
- this.pattern = null;
- } else {
- this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
- }
- this.operandConverter = operandConverter;
- }
-
- @Override
- public String toString() {
- return super.toString().replace('_', ' ');
- }
-
- public boolean hasOperands() {
- return operandConverter != NO_OPERANDS;
- }
- }
-
- /**
- * Call of SQL command with operands and command type.
- */
- public static class SqlCommandCall {
- public final SqlCommand command;
- public final String[] operands;
- public final String sql;
-
- public SqlCommandCall(SqlCommand command, String[] operands, String sql) {
- this.command = command;
- this.operands = operands;
- this.sql = sql;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- SqlCommandCall that = (SqlCommandCall) o;
- return command == that.command && Arrays.equals(operands, that.operands);
- }
-
- @Override
- public int hashCode() {
- int result = Objects.hash(command);
- result = 31 * result + Arrays.hashCode(operands);
- return result;
- }
-
- @Override
- public String toString() {
- return command + "(" + Arrays.toString(operands) + ")";
- }
- }
-}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
index a713d1c..e182f97 100644
--- a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112Shims.java
@@ -22,73 +22,32 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
-import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.CatalogTableSchemaResolver;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
-import org.apache.flink.table.delegation.Executor;
-import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
-import org.apache.flink.table.delegation.Planner;
-import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.delegation.*;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -97,29 +56,19 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims112.CollectStreamTableSink;
import org.apache.zeppelin.flink.shims112.Flink112ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
/**
@@ -128,41 +77,28 @@ import java.util.regex.Matcher;
public class Flink112Shims extends FlinkShims {
private static final Logger LOGGER = LoggerFactory.getLogger(Flink112Shims.class);
- public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
- .append("The following commands are available:\n\n")
- .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
- .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
- .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
- .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
- .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
- .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
- .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
- .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
- .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
- .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
- .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
- .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
- .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
- .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
- .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
- .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
- .style(AttributedStyle.DEFAULT.underline())
- .append("\nHint")
- .style(AttributedStyle.DEFAULT)
- .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
- .toAttributedString();
-
- private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+ private Flink112SqlInterpreter batchSqlInterpreter;
+ private Flink112SqlInterpreter streamSqlInterpreter;
+
public Flink112Shims(FlinkVersion flinkVersion, Properties properties) {
super(flinkVersion, properties);
}
+
+ public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.batchSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, true);
+ }
+
+ public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.streamSqlInterpreter = new Flink112SqlInterpreter(flinkSqlContext, false);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
}
-
@Override
public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
return new StreamExecutionEnvironmentFactory() {
@@ -222,31 +158,6 @@ public class Flink112Shims extends FlinkShims {
}
@Override
- public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
- StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
- statementSetMap.put(context.getParagraphId(), statementSet);
- }
-
- @Override
- public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
- statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
- }
-
- @Override
- public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
- JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
- while (!jobClient.getJobStatus().get().isTerminalState()) {
- LOGGER.debug("Wait for job to finish");
- Thread.sleep(1000 * 5);
- }
- if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
- context.out.write("Job is cancelled.\n");
- return false;
- }
- return true;
- }
-
- @Override
public boolean rowEquals(Object row1, Object row2) {
Row r1 = (Row) row1;
Row r2 = (Row) row2;
@@ -292,151 +203,6 @@ public class Flink112Shims extends FlinkShims {
}
/**
- * Parse it via flink SqlParser first, then fallback to regular expression matching.
- *
- * @param tableEnv
- * @param stmt
- * @return
- */
- @Override
- public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
- Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
- SqlCommandCall sqlCommandCall = null;
- try {
- // parse statement via regex matching first
- Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
- if (callOpt.isPresent()) {
- sqlCommandCall = callOpt.get();
- } else {
- sqlCommandCall = parseBySqlParser(sqlParser, stmt);
- }
- } catch (Exception e) {
- return Optional.empty();
- }
- return Optional.of(sqlCommandCall);
-
- }
-
- private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
- List<Operation> operations;
- try {
- operations = sqlParser.parse(stmt);
- } catch (Throwable e) {
- throw new Exception("Invalidate SQL statement.", e);
- }
- if (operations.size() != 1) {
- throw new Exception("Only single statement is supported now.");
- }
-
- final SqlCommand cmd;
- String[] operands = new String[]{stmt};
- Operation operation = operations.get(0);
- if (operation instanceof CatalogSinkModifyOperation) {
- boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
- cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
- } else if (operation instanceof CreateTableOperation) {
- cmd = SqlCommand.CREATE_TABLE;
- } else if (operation instanceof DropTableOperation) {
- cmd = SqlCommand.DROP_TABLE;
- } else if (operation instanceof AlterTableOperation) {
- cmd = SqlCommand.ALTER_TABLE;
- } else if (operation instanceof CreateViewOperation) {
- cmd = SqlCommand.CREATE_VIEW;
- } else if (operation instanceof DropViewOperation) {
- cmd = SqlCommand.DROP_VIEW;
- } else if (operation instanceof CreateDatabaseOperation) {
- cmd = SqlCommand.CREATE_DATABASE;
- } else if (operation instanceof DropDatabaseOperation) {
- cmd = SqlCommand.DROP_DATABASE;
- } else if (operation instanceof AlterDatabaseOperation) {
- cmd = SqlCommand.ALTER_DATABASE;
- } else if (operation instanceof CreateCatalogOperation) {
- cmd = SqlCommand.CREATE_CATALOG;
- } else if (operation instanceof DropCatalogOperation) {
- cmd = SqlCommand.DROP_CATALOG;
- } else if (operation instanceof UseCatalogOperation) {
- cmd = SqlCommand.USE_CATALOG;
- operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
- } else if (operation instanceof UseDatabaseOperation) {
- cmd = SqlCommand.USE;
- operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
- } else if (operation instanceof ShowCatalogsOperation) {
- cmd = SqlCommand.SHOW_CATALOGS;
- operands = new String[0];
- } else if (operation instanceof ShowDatabasesOperation) {
- cmd = SqlCommand.SHOW_DATABASES;
- operands = new String[0];
- } else if (operation instanceof ShowTablesOperation) {
- cmd = SqlCommand.SHOW_TABLES;
- operands = new String[0];
- } else if (operation instanceof ShowFunctionsOperation) {
- cmd = SqlCommand.SHOW_FUNCTIONS;
- operands = new String[0];
- } else if (operation instanceof CreateCatalogFunctionOperation ||
- operation instanceof CreateTempSystemFunctionOperation) {
- cmd = SqlCommand.CREATE_FUNCTION;
- } else if (operation instanceof DropCatalogFunctionOperation ||
- operation instanceof DropTempSystemFunctionOperation) {
- cmd = SqlCommand.DROP_FUNCTION;
- } else if (operation instanceof AlterCatalogFunctionOperation) {
- cmd = SqlCommand.ALTER_FUNCTION;
- } else if (operation instanceof ExplainOperation) {
- cmd = SqlCommand.EXPLAIN;
- } else if (operation instanceof DescribeTableOperation) {
- cmd = SqlCommand.DESCRIBE;
- operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
- } else if (operation instanceof QueryOperation) {
- cmd = SqlCommand.SELECT;
- } else {
- throw new Exception("Unknown operation: " + operation.asSummaryString());
- }
-
- return new SqlCommandCall(cmd, operands, stmt);
- }
-
- private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
- // parse statement via regex matching
- for (SqlCommand cmd : SqlCommand.values()) {
- if (cmd.pattern != null) {
- final Matcher matcher = cmd.pattern.matcher(stmt);
- if (matcher.matches()) {
- final String[] groups = new String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
- }
- return cmd.operandConverter.apply(groups)
- .map((operands) -> {
- String[] newOperands = operands;
- if (cmd == SqlCommand.EXPLAIN) {
- // convert `explain xx` to `explain plan for xx`
- // which can execute through executeSql method
- newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
- }
- return new SqlCommandCall(cmd, newOperands, stmt);
- });
- }
- }
- }
- return Optional.empty();
- }
-
- @Override
- public void executeSql(Object tableEnv, String sql) {
- ((TableEnvironment) tableEnv).executeSql(sql);
- }
-
- @Override
- public String explain(Object tableEnv, String sql) {
- TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
- return tableResult.collect().next().getField(0).toString();
- }
-
- @Override
- public String sqlHelp() {
- return MESSAGE_HELP.toString();
- }
-
- /**
* Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
* This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
* @param catalogManager
@@ -464,36 +230,6 @@ public class Flink112Shims extends FlinkShims {
}
@Override
- public Map extractTableConfigOptions() {
- Map<String, ConfigOption> configOptions = new HashMap<>();
- configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
- configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
- try {
- configOptions.putAll(extractConfigOptions(PythonOptions.class));
- } catch (NoClassDefFoundError e) {
- LOGGER.warn("No pyflink jars found");
- }
- configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
- return configOptions;
- }
-
- private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
- Map<String, ConfigOption> configOptions = new HashMap();
- Field[] fields = clazz.getDeclaredFields();
- for (Field field : fields) {
- if (field.getType().isAssignableFrom(ConfigOption.class)) {
- try {
- ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
- configOptions.put(configOption.key(), configOption);
- } catch (Throwable e) {
- LOGGER.warn("Fail to get ConfigOption", e);
- }
- }
- }
- return configOptions;
- }
-
- @Override
public String[] rowToString(Object row, Object table, Object tableConfig) {
return PrintUtils.rowToString((Row) row);
}
@@ -511,10 +247,10 @@ public class Flink112Shims extends FlinkShims {
Method createMethod = executorFactory.getClass()
.getMethod("create", Map.class, StreamExecutionEnvironment.class);
- return (Executor) createMethod.invoke(
+ return createMethod.invoke(
executorFactory,
executorProperties,
- (StreamExecutionEnvironment) sEnv);
+ sEnv);
} catch (Exception e) {
throw new TableException(
"Could not instantiate the executor. Make sure a planner module is on the classpath",
@@ -535,4 +271,12 @@ public class Flink112Shims extends FlinkShims {
(CatalogManager) catalogManager);
return ImmutablePair.of(planner, executor);
}
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+ if (isBatch) {
+ return batchSqlInterpreter.runSqlList(st, context);
+ } else {
+ return streamSqlInterpreter.runSqlList(st, context);
+ }
+ }
}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
new file mode 100644
index 0000000..b72ef26
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/Flink112SqlInterpreter.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.zeppelin.flink.shims112.SqlCommandParser;
+import org.apache.zeppelin.flink.shims112.SqlCommandParser.SqlCommand;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+public class Flink112SqlInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink112SqlInterpreter.class);
+ private static final AttributedString MESSAGE_HELP =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(
+ formatCommand(
+ SqlCommand.CREATE_TABLE,
+ "Create table under current catalog and database."))
+ .append(
+ formatCommand(
+ SqlCommand.DROP_TABLE,
+ "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
+ .append(
+ formatCommand(
+ SqlCommand.CREATE_VIEW,
+ "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+ .append(
+ formatCommand(
+ SqlCommand.DESCRIBE,
+ "Describes the schema of a table with the given name."))
+ .append(
+ formatCommand(
+ SqlCommand.DROP_VIEW,
+ "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
+ .append(
+ formatCommand(
+ SqlCommand.EXPLAIN,
+ "Describes the execution plan of a query or table with the given name."))
+ .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
+ .append(
+ formatCommand(
+ SqlCommand.INSERT_INTO,
+ "Inserts the results of a SQL SELECT query into a declared table sink."))
+ .append(
+ formatCommand(
+ SqlCommand.INSERT_OVERWRITE,
+ "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
+ .append(
+ formatCommand(
+ SqlCommand.SELECT,
+ "Executes a SQL SELECT query on the Flink cluster."))
+ .append(
+ formatCommand(
+ SqlCommand.SET,
+ "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+ .append(
+ formatCommand(
+ SqlCommand.SHOW_FUNCTIONS,
+ "Shows all user-defined and built-in functions."))
+ .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
+ .append(
+ formatCommand(
+ SqlCommand.USE_CATALOG,
+ "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
+ .append(
+ formatCommand(
+ SqlCommand.USE,
+ "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
+ .toAttributedString();
+
+ private static AttributedString formatCommand(SqlCommandParser.SqlCommand cmd, String description) {
+ return new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(cmd.toString())
+ .append("\t\t")
+ .style(AttributedStyle.DEFAULT)
+ .append(description)
+ .append('\n')
+ .toAttributedString();
+ }
+
+ private FlinkSqlContext flinkSqlContext;
+ private TableEnvironment tbenv;
+ private ZeppelinContext z;
+ private SqlCommandParser sqlCommandParser;
+ private SqlSplitter sqlSplitter;
+ private boolean isBatch;
+ private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+ // paragraphId -> StatementSet
+ private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+
+ public Flink112SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+ this.flinkSqlContext = flinkSqlContext;
+ this.isBatch = isBatch;
+ if (isBatch) {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+ } else {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+ }
+ this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+ this.sqlCommandParser = new SqlCommandParser((TableEnvironmentInternal) tbenv);
+ this.sqlSplitter = new SqlSplitter();
+ JobListener jobListener = new JobListener() {
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ LOGGER.info("UnLock JobSubmitLock");
+ }
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+ }
+ };
+
+ ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+ ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+ }
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ try {
+ boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+ List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+ boolean isFirstInsert = true;
+ boolean hasInsert = false;
+ for (String sql : sqls) {
+ SqlCommandParser.SqlCommandCall sqlCommand = null;
+ try {
+ sqlCommand = sqlCommandParser.parse(sql);
+ } catch (Exception e1) {
+ try {
+ context.out.write("%text Invalid Sql statement: " + sql + "\n");
+ context.out.write(e1.toString());
+ context.out.write(MESSAGE_HELP.toString());
+ } catch (IOException e2) {
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e2.toString());
+ }
+ return new InterpreterResult(InterpreterResult.Code.ERROR);
+ }
+
+ try {
+ if (sqlCommand.command == SqlCommand.INSERT_INTO ||
+ sqlCommand.command == SqlCommand.INSERT_OVERWRITE) {
+ hasInsert = true;
+ if (isFirstInsert && runAsOne) {
+ startMultipleInsert(context);
+ isFirstInsert = false;
+ }
+ }
+ callCommand(sqlCommand, sql, context);
+ context.out.flush();
+ } catch (Throwable e) {
+ LOGGER.error("Fail to run sql:" + sql, e);
+ try {
+ context.out.write("%text Fail to run sql command: " +
+ sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+ } catch (IOException ex) {
+ LOGGER.warn("Unexpected exception:", ex);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ ExceptionUtils.getStackTrace(e));
+ }
+ return new InterpreterResult(InterpreterResult.Code.ERROR);
+ }
+ }
+
+ if (runAsOne && hasInsert) {
+ try {
+ lock.lock();
+ String jobName = context.getStringLocalProperty("jobName", st);
+ if (executeMultipleInsertInto(jobName, context)) {
+ context.out.write("Insertion successfully.\n");
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql as one job", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ statementSetMap.remove(context.getParagraphId());
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ }
+
+ private void callCommand(SqlCommandParser.SqlCommandCall cmdCall,
+ String sql,
+ InterpreterContext context) throws Exception {
+ switch (cmdCall.command) {
+ case SET:
+ callSet(cmdCall, context);
+ break;
+ case HELP:
+ callHelp(context);
+ break;
+ case SHOW_CATALOGS:
+ callShowCatalogs(context);
+ break;
+ case SHOW_CURRENT_CATALOG:
+ callShowCurrentCatalog(context);
+ break;
+ case SHOW_DATABASES:
+ callShowDatabases(context);
+ break;
+ case SHOW_CURRENT_DATABASE:
+ callShowCurrentDatabase(context);
+ break;
+ case SHOW_TABLES:
+ callShowTables(context);
+ break;
+ case SHOW_FUNCTIONS:
+ callShowFunctions(context);
+ break;
+ case SHOW_MODULES:
+ callShowModules(context);
+ break;
+ case SHOW_PARTITIONS:
+ callShowPartitions(sql, context);
+ break;
+ case USE_CATALOG:
+ callUseCatalog(cmdCall.operands[0], context);
+ break;
+ case USE:
+ callUseDatabase(cmdCall.operands[0], context);
+ break;
+ case DESC:
+ case DESCRIBE:
+ callDescribe(cmdCall.operands[0], context);
+ break;
+ case EXPLAIN:
+ callExplain(cmdCall.operands[0], context);
+ break;
+ case SELECT:
+ callSelect(cmdCall.operands[0], context);
+ break;
+ case INSERT_INTO:
+ case INSERT_OVERWRITE:
+ callInsertInto(cmdCall.operands[0], context);
+ break;
+ case CREATE_TABLE:
+ callDDL(sql, context, "Table has been created.");
+ break;
+ case DROP_TABLE:
+ callDDL(sql, context, "Table has been dropped.");
+ break;
+ case ALTER_TABLE:
+ callDDL(sql, context, "Alter table succeeded!");
+ break;
+ case CREATE_VIEW:
+ callDDL(sql, context, "View has been created.");
+ break;
+ case DROP_VIEW:
+ callDDL(sql, context, "View has been dropped.");
+ break;
+ case ALTER_VIEW:
+ callDDL(sql, context, "Alter view succeeded!");
+ break;
+ case CREATE_FUNCTION:
+ callDDL(sql, context, "Function has been created.");
+ break;
+ case DROP_FUNCTION:
+ callDDL(sql, context, "Function has been removed.");
+ break;
+ case ALTER_FUNCTION:
+ callDDL(sql, context, "Alter function succeeded!");
+ break;
+ case CREATE_DATABASE:
+ callDDL(sql, context, "Database has been created.");
+ break;
+ case DROP_DATABASE:
+ callDDL(sql, context, "Database has been removed.");
+ break;
+ case ALTER_DATABASE:
+ callDDL(sql, context, "Alter database succeeded!");
+ break;
+ case CREATE_CATALOG:
+ callDDL(sql, context, "Catalog has been created.");
+ break;
+ case DROP_CATALOG:
+ callDDL(sql, context, "Catalog has been dropped.");
+ break;
+ default:
+ throw new Exception("Unsupported command: " + cmdCall.command);
+ }
+ }
+
+ private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+ try {
+ lock.lock();
+ this.tbenv.executeSql(sql);
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ context.out.write(message + "\n");
+ }
+
+ private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+ tbenv.executeSql("USE CATALOG `" + catalog + "`");
+ }
+
+ private void callHelp(InterpreterContext context) throws IOException {
+ context.out.write(MESSAGE_HELP.toString() + "\n");
+ }
+
+ private void callShowCatalogs(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+ List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+ }
+
+ private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+ String catalog = tableResult.collect().next().toString();
+ context.out.write("%text current catalog: " + catalog + "\n");
+ }
+
+ private void callShowDatabases(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+ List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+ }
+
+ private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+ String database = tableResult.collect().next().toString();
+ context.out.write("%text current database: " + database + "\n");
+ }
+
+ private void callShowTables(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+ List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+ }
+
+ private void callShowFunctions(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+ List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+ }
+
+ private void callShowModules(InterpreterContext context) throws IOException {
+ String[] modules = this.tbenv.listModules();
+ context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+ }
+
+ private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql(sql);
+ List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table partitions\n" + StringUtils.join(functions, "\n") + "\n");
+ }
+
+ public void startMultipleInsert(InterpreterContext context) throws Exception {
+ StatementSet statementSet = tbenv.createStatementSet();
+ statementSetMap.put(context.getParagraphId(), statementSet);
+ }
+
+ public void addInsertStatement(String sql, InterpreterContext context) throws Exception {
+ statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
+ }
+
+ public boolean executeMultipleInsertInto(String jobName, InterpreterContext context) throws Exception {
+ JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
+ while (!jobClient.getJobStatus().get().isTerminalState()) {
+ LOGGER.debug("Wait for job to finish");
+ Thread.sleep(1000 * 5);
+ }
+ if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
+ context.out.write("Job is cancelled.\n");
+ return false;
+ }
+ return true;
+ }
+
+ private void callUseDatabase(String databaseName,
+ InterpreterContext context) throws IOException {
+ this.tbenv.executeSql("USE `" + databaseName + "`");
+ }
+
+ private void callDescribe(String name, InterpreterContext context) throws IOException {
+ TableResult tableResult = null;
+ try {
+ tableResult = tbenv.executeSql("DESCRIBE " + name);
+ } catch (Exception e) {
+ throw new IOException("Fail to describe table: " + name, e);
+ }
+ CloseableIterator<Row> result = tableResult.collect();
+ StringBuilder builder = new StringBuilder();
+ builder.append("Column\tType\n");
+ while (result.hasNext()) {
+ Row row = result.next();
+ builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+ }
+ context.out.write("%table\n" + builder.toString());
+ }
+
+ private void callExplain(String sql, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = tbenv.executeSql(sql);
+ String result = tableResult.collect().next().getField(0).toString();
+ context.out.write(result + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callSelect(String sql, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ if (isBatch) {
+ callBatchInnerSelect(sql, context);
+ } else {
+ callStreamInnerSelect(sql, context);
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+ Table table = this.tbenv.sqlQuery(sql);
+ String result = z.showData(table);
+ context.out.write(result);
+ }
+
+ public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+ flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+ }
+
+ private String removeSingleQuote(String value) {
+ value = value.trim();
+ if (value.startsWith("'")) {
+ value = value.substring(1);
+ }
+ if (value.endsWith("'")) {
+ value = value.substring(0, value.length() - 1);
+ }
+ return value;
+ }
+
+ public void callSet(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws Exception {
+ if (sqlCommand.operands.length == 0) {
+ // show all properties
+ final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+ List<String> prettyEntries = new ArrayList<>();
+ for (String k : properties.keySet()) {
+ prettyEntries.add(
+ String.format(
+ "'%s' = '%s'",
+ EncodingUtils.escapeSingleQuotes(k),
+ EncodingUtils.escapeSingleQuotes(properties.get(k))));
+ }
+ prettyEntries.sort(String::compareTo);
+ prettyEntries.forEach(entry -> {
+ try {
+ context.out.write(entry + "\n");
+ } catch (IOException e) {
+ LOGGER.warn("Fail to write output", e);
+ }
+ });
+ } else {
+ String key = removeSingleQuote(sqlCommand.operands[0]);
+ String value = removeSingleQuote(sqlCommand.operands[1]);
+ if ("execution.runtime-mode".equals(key)) {
+ throw new UnsupportedOperationException("execution.runtime-mode is not supported to set, " +
+ "you can use %flink.ssql & %flink.bsql to switch between streaming mode and batch mode");
+ }
+ LOGGER.info("Set table config: {}={}", key, value);
+ this.tbenv.getConfig().getConfiguration().setString(key, value);
+ }
+ }
+
+ public void callInsertInto(String sql,
+ InterpreterContext context) throws IOException {
+ if (!isBatch) {
+ context.getLocalProperties().put("flink.streaming.insert_into", "true");
+ }
+ try {
+ lock.lock();
+ boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+ if (!runAsOne) {
+ this.tbenv.sqlUpdate(sql);
+ String jobName = context.getStringLocalProperty("jobName", sql);
+ this.tbenv.execute(jobName);
+ context.out.write("Insertion successfully.\n");
+ } else {
+ addInsertStatement(sql, context);
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+}
diff --git a/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
new file mode 100644
index 0000000..309250f
--- /dev/null
+++ b/flink/flink1.12-shims/src/main/java/org/apache/zeppelin/flink/shims112/SqlCommandParser.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink.shims112;
+
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.ddl.*;
+
+import javax.annotation.Nullable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * This is copied from flink project with minor modification.
+ * Simple parser for determining the type of command and its parameters.
+ * */
+public final class SqlCommandParser {
+
+ private Parser parser;
+
+ public SqlCommandParser(TableEnvironmentInternal tbenv) {
+ this.parser = tbenv.getParser();
+ }
+
+ /**
+ * Parse a sql statement and return corresponding {@link SqlCommandCall}. If the statement is
+ * invalid, a {@link Exception} will be thrown.
+ *
+ * @param stmt The statement to be parsed
+ * @return the corresponding SqlCommandCall.
+ */
+ public SqlCommandCall parse(String stmt) throws Exception {
+ // normalize
+ stmt = stmt.trim();
+ // remove ';' at the end
+ if (stmt.endsWith(";")) {
+ stmt = stmt.substring(0, stmt.length() - 1).trim();
+ }
+
+ // parse statement via regex matching first
+ Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
+ if (callOpt.isPresent()) {
+ return callOpt.get();
+ } else {
+ return parseBySqlParser(stmt);
+ }
+ }
+
+ private SqlCommandCall parseBySqlParser(String stmt) throws Exception {
+ List<Operation> operations;
+ try {
+ operations = parser.parse(stmt);
+ } catch (Throwable e) {
+ throw new Exception("Invalidate SQL statement.", e);
+ }
+ if (operations.size() != 1) {
+ throw new Exception("Only single statement is supported now.");
+ }
+
+ final SqlCommand cmd;
+ String[] operands = new String[] {stmt};
+ Operation operation = operations.get(0);
+ if (operation instanceof CatalogSinkModifyOperation) {
+ boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
+ cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
+ } else if (operation instanceof CreateTableOperation) {
+ cmd = SqlCommand.CREATE_TABLE;
+ } else if (operation instanceof DropTableOperation) {
+ cmd = SqlCommand.DROP_TABLE;
+ } else if (operation instanceof AlterTableOperation) {
+ cmd = SqlCommand.ALTER_TABLE;
+ } else if (operation instanceof CreateViewOperation) {
+ cmd = SqlCommand.CREATE_VIEW;
+ } else if (operation instanceof DropViewOperation) {
+ cmd = SqlCommand.DROP_VIEW;
+ } else if (operation instanceof AlterViewOperation) {
+ cmd = SqlCommand.ALTER_VIEW;
+ } else if (operation instanceof CreateDatabaseOperation) {
+ cmd = SqlCommand.CREATE_DATABASE;
+ } else if (operation instanceof DropDatabaseOperation) {
+ cmd = SqlCommand.DROP_DATABASE;
+ } else if (operation instanceof AlterDatabaseOperation) {
+ cmd = SqlCommand.ALTER_DATABASE;
+ } else if (operation instanceof CreateCatalogOperation) {
+ cmd = SqlCommand.CREATE_CATALOG;
+ } else if (operation instanceof DropCatalogOperation) {
+ cmd = SqlCommand.DROP_CATALOG;
+ } else if (operation instanceof UseCatalogOperation) {
+ cmd = SqlCommand.USE_CATALOG;
+ operands = new String[] {((UseCatalogOperation) operation).getCatalogName()};
+ } else if (operation instanceof UseDatabaseOperation) {
+ cmd = SqlCommand.USE;
+ operands = new String[] {((UseDatabaseOperation) operation).getDatabaseName()};
+ } else if (operation instanceof ShowCatalogsOperation) {
+ cmd = SqlCommand.SHOW_CATALOGS;
+ operands = new String[0];
+ } else if (operation instanceof ShowCurrentCatalogOperation) {
+ cmd = SqlCommand.SHOW_CURRENT_CATALOG;
+ operands = new String[0];
+ } else if (operation instanceof ShowDatabasesOperation) {
+ cmd = SqlCommand.SHOW_DATABASES;
+ operands = new String[0];
+ } else if (operation instanceof ShowCurrentDatabaseOperation) {
+ cmd = SqlCommand.SHOW_CURRENT_DATABASE;
+ operands = new String[0];
+ } else if (operation instanceof ShowTablesOperation) {
+ cmd = SqlCommand.SHOW_TABLES;
+ operands = new String[0];
+ } else if (operation instanceof ShowFunctionsOperation) {
+ cmd = SqlCommand.SHOW_FUNCTIONS;
+ operands = new String[0];
+ } else if (operation instanceof ShowPartitionsOperation) {
+ cmd = SqlCommand.SHOW_PARTITIONS;
+ } else if (operation instanceof CreateCatalogFunctionOperation
+ || operation instanceof CreateTempSystemFunctionOperation) {
+ cmd = SqlCommand.CREATE_FUNCTION;
+ } else if (operation instanceof DropCatalogFunctionOperation
+ || operation instanceof DropTempSystemFunctionOperation) {
+ cmd = SqlCommand.DROP_FUNCTION;
+ } else if (operation instanceof AlterCatalogFunctionOperation) {
+ cmd = SqlCommand.ALTER_FUNCTION;
+ } else if (operation instanceof ExplainOperation) {
+ cmd = SqlCommand.EXPLAIN;
+ } else if (operation instanceof DescribeTableOperation) {
+ cmd = SqlCommand.DESCRIBE;
+ operands =
+ new String[] {
+ ((DescribeTableOperation) operation)
+ .getSqlIdentifier()
+ .asSerializableString()
+ };
+ } else if (operation instanceof QueryOperation) {
+ cmd = SqlCommand.SELECT;
+ } else {
+ throw new Exception("Unknown operation: " + operation.asSummaryString());
+ }
+
+ return new SqlCommandCall(cmd, operands);
+ }
+
+ private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
+ // parse statement via regex matching
+ for (SqlCommand cmd : SqlCommand.values()) {
+ if (cmd.hasRegexPattern()) {
+ final Matcher matcher = cmd.pattern.matcher(stmt);
+ if (matcher.matches()) {
+ final String[] groups = new String[matcher.groupCount()];
+ for (int i = 0; i < groups.length; i++) {
+ groups[i] = matcher.group(i + 1);
+ }
+ return cmd.operandConverter
+ .apply(groups)
+ .map(
+ (operands) -> {
+ String[] newOperands = operands;
+ if (cmd == SqlCommand.EXPLAIN) {
+ // convert `explain xx` to `explain plan for xx`
+ // which can execute through executeSql method
+ newOperands =
+ new String[] {
+ "EXPLAIN PLAN FOR "
+ + operands[0]
+ + " "
+ + operands[1]
+ };
+ }
+ return new SqlCommandCall(cmd, newOperands);
+ });
+ }
+ }
+ }
+ return Optional.empty();
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ private static final Function<String[], Optional<String[]>> NO_OPERANDS =
+ (operands) -> Optional.of(new String[0]);
+
+ private static final Function<String[], Optional<String[]>> SINGLE_OPERAND =
+ (operands) -> Optional.of(new String[] {operands[0]});
+
+ private static final int DEFAULT_PATTERN_FLAGS = Pattern.CASE_INSENSITIVE | Pattern.DOTALL;
+
+ /** Supported SQL commands. */
+ public enum SqlCommand {
+
+ HELP("HELP", NO_OPERANDS),
+
+ SHOW_CATALOGS,
+
+ SHOW_CURRENT_CATALOG,
+
+ SHOW_DATABASES,
+
+ SHOW_CURRENT_DATABASE,
+
+ SHOW_TABLES,
+
+ SHOW_FUNCTIONS,
+
+ // FLINK-17396
+ SHOW_MODULES("SHOW\\s+MODULES", NO_OPERANDS),
+
+ SHOW_PARTITIONS,
+
+ USE_CATALOG,
+
+ USE,
+
+ CREATE_CATALOG,
+
+ DROP_CATALOG,
+
+ DESC("DESC\\s+(.*)", SINGLE_OPERAND),
+
+ DESCRIBE,
+
+ // supports both `explain xx` and `explain plan for xx` now
+ // TODO should keep `explain xx` ?
+ // only match "EXPLAIN SELECT xx" and "EXPLAIN INSERT xx" here
+ // "EXPLAIN PLAN FOR xx" should be parsed via sql parser
+ EXPLAIN(
+ "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
+ (operands) -> {
+ return Optional.of(new String[] {operands[0], operands[1]});
+ }),
+
+ CREATE_DATABASE,
+
+ DROP_DATABASE,
+
+ ALTER_DATABASE,
+
+ CREATE_TABLE,
+
+ DROP_TABLE,
+
+ ALTER_TABLE,
+
+ CREATE_VIEW,
+
+ DROP_VIEW,
+
+ ALTER_VIEW,
+
+ CREATE_FUNCTION,
+
+ DROP_FUNCTION,
+
+ ALTER_FUNCTION,
+
+ SELECT,
+
+ INSERT_INTO,
+
+ INSERT_OVERWRITE,
+
+ SET(
+ "SET(\\s+(\\S+)\\s*=(.*))?", // whitespace is only ignored on the left side of '='
+ (operands) -> {
+ if (operands.length < 3) {
+ return Optional.empty();
+ } else if (operands[0] == null) {
+ return Optional.of(new String[0]);
+ }
+ return Optional.of(new String[] {operands[1], operands[2]});
+ }),
+
+ SOURCE("SOURCE\\s+(.*)", SINGLE_OPERAND);
+
+ public final @Nullable Pattern pattern;
+ public final @Nullable Function<String[], Optional<String[]>> operandConverter;
+
+ SqlCommand() {
+ this.pattern = null;
+ this.operandConverter = null;
+ }
+
+ SqlCommand(String matchingRegex, Function<String[], Optional<String[]>> operandConverter) {
+ this.pattern = Pattern.compile(matchingRegex, DEFAULT_PATTERN_FLAGS);
+ this.operandConverter = operandConverter;
+ }
+
+ @Override
+ public String toString() {
+ return super.toString().replace('_', ' ');
+ }
+
+ public boolean hasOperands() {
+ return operandConverter != NO_OPERANDS;
+ }
+
+ public boolean hasRegexPattern() {
+ return pattern != null;
+ }
+ }
+
+ /** Call of SQL command with operands and command type. */
+ public static class SqlCommandCall {
+ public final SqlCommand command;
+ public final String[] operands;
+
+ public SqlCommandCall(SqlCommand command, String[] operands) {
+ this.command = command;
+ this.operands = operands;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SqlCommandCall that = (SqlCommandCall) o;
+ return command == that.command && Arrays.equals(operands, that.operands);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(command);
+ result = 31 * result + Arrays.hashCode(operands);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return command + "(" + Arrays.toString(operands) + ")";
+ }
+ }
+}
\ No newline at end of file
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
index 481bfee..a4743e8 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113Shims.java
@@ -22,7 +22,6 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -30,36 +29,21 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.scala.DataSet;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.PlannerType;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
-import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
@@ -67,32 +51,6 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -101,30 +59,20 @@ import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims113.CollectStreamTableSink;
import org.apache.zeppelin.flink.shims113.Flink113ScalaShims;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
/**
@@ -133,42 +81,27 @@ import java.util.regex.Matcher;
public class Flink113Shims extends FlinkShims {
private static final Logger LOGGER = LoggerFactory.getLogger(Flink113Shims.class);
- public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
- .append("The following commands are available:\n\n")
- .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
- .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
- .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
- .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
- .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
- .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
- .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
- .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
- .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
- .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
- .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
- .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
- .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
- .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
- .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
- .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
- .style(AttributedStyle.DEFAULT.underline())
- .append("\nHint")
- .style(AttributedStyle.DEFAULT)
- .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
- .toAttributedString();
-
- private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+ private Flink113SqlInterpreter batchSqlInterpreter;
+ private Flink113SqlInterpreter streamSqlInterpreter;
public Flink113Shims(FlinkVersion flinkVersion, Properties properties) {
super(flinkVersion, properties);
}
+ public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.batchSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, true);
+ }
+
+ public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.streamSqlInterpreter = new Flink113SqlInterpreter(flinkSqlContext, false);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
}
-
@Override
public Object createStreamExecutionEnvironmentFactory(Object streamExecutionEnvironment) {
return new StreamExecutionEnvironmentFactory() {
@@ -228,31 +161,6 @@ public class Flink113Shims extends FlinkShims {
}
@Override
- public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
- StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
- statementSetMap.put(context.getParagraphId(), statementSet);
- }
-
- @Override
- public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
- statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
- }
-
- @Override
- public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
- JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
- while (!jobClient.getJobStatus().get().isTerminalState()) {
- LOGGER.debug("Wait for job to finish");
- Thread.sleep(1000 * 5);
- }
- if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
- context.out.write("Job is cancelled.\n");
- return false;
- }
- return true;
- }
-
- @Override
public boolean rowEquals(Object row1, Object row2) {
Row r1 = (Row) row1;
Row r2 = (Row) row2;
@@ -298,151 +206,6 @@ public class Flink113Shims extends FlinkShims {
}
/**
- * Parse it via flink SqlParser first, then fallback to regular expression matching.
- *
- * @param tableEnv
- * @param stmt
- * @return
- */
- @Override
- public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
- Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
- SqlCommandCall sqlCommandCall = null;
- try {
- // parse statement via regex matching first
- Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
- if (callOpt.isPresent()) {
- sqlCommandCall = callOpt.get();
- } else {
- sqlCommandCall = parseBySqlParser(sqlParser, stmt);
- }
- } catch (Exception e) {
- return Optional.empty();
- }
- return Optional.of(sqlCommandCall);
-
- }
-
- private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
- List<Operation> operations;
- try {
- operations = sqlParser.parse(stmt);
- } catch (Throwable e) {
- throw new Exception("Invalidate SQL statement.", e);
- }
- if (operations.size() != 1) {
- throw new Exception("Only single statement is supported now.");
- }
-
- final SqlCommand cmd;
- String[] operands = new String[]{stmt};
- Operation operation = operations.get(0);
- if (operation instanceof CatalogSinkModifyOperation) {
- boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
- cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
- } else if (operation instanceof CreateTableOperation) {
- cmd = SqlCommand.CREATE_TABLE;
- } else if (operation instanceof DropTableOperation) {
- cmd = SqlCommand.DROP_TABLE;
- } else if (operation instanceof AlterTableOperation) {
- cmd = SqlCommand.ALTER_TABLE;
- } else if (operation instanceof CreateViewOperation) {
- cmd = SqlCommand.CREATE_VIEW;
- } else if (operation instanceof DropViewOperation) {
- cmd = SqlCommand.DROP_VIEW;
- } else if (operation instanceof CreateDatabaseOperation) {
- cmd = SqlCommand.CREATE_DATABASE;
- } else if (operation instanceof DropDatabaseOperation) {
- cmd = SqlCommand.DROP_DATABASE;
- } else if (operation instanceof AlterDatabaseOperation) {
- cmd = SqlCommand.ALTER_DATABASE;
- } else if (operation instanceof CreateCatalogOperation) {
- cmd = SqlCommand.CREATE_CATALOG;
- } else if (operation instanceof DropCatalogOperation) {
- cmd = SqlCommand.DROP_CATALOG;
- } else if (operation instanceof UseCatalogOperation) {
- cmd = SqlCommand.USE_CATALOG;
- operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
- } else if (operation instanceof UseDatabaseOperation) {
- cmd = SqlCommand.USE;
- operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
- } else if (operation instanceof ShowCatalogsOperation) {
- cmd = SqlCommand.SHOW_CATALOGS;
- operands = new String[0];
- } else if (operation instanceof ShowDatabasesOperation) {
- cmd = SqlCommand.SHOW_DATABASES;
- operands = new String[0];
- } else if (operation instanceof ShowTablesOperation) {
- cmd = SqlCommand.SHOW_TABLES;
- operands = new String[0];
- } else if (operation instanceof ShowFunctionsOperation) {
- cmd = SqlCommand.SHOW_FUNCTIONS;
- operands = new String[0];
- } else if (operation instanceof CreateCatalogFunctionOperation ||
- operation instanceof CreateTempSystemFunctionOperation) {
- cmd = SqlCommand.CREATE_FUNCTION;
- } else if (operation instanceof DropCatalogFunctionOperation ||
- operation instanceof DropTempSystemFunctionOperation) {
- cmd = SqlCommand.DROP_FUNCTION;
- } else if (operation instanceof AlterCatalogFunctionOperation) {
- cmd = SqlCommand.ALTER_FUNCTION;
- } else if (operation instanceof ExplainOperation) {
- cmd = SqlCommand.EXPLAIN;
- } else if (operation instanceof DescribeTableOperation) {
- cmd = SqlCommand.DESCRIBE;
- operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
- } else if (operation instanceof QueryOperation) {
- cmd = SqlCommand.SELECT;
- } else {
- throw new Exception("Unknown operation: " + operation.asSummaryString());
- }
-
- return new SqlCommandCall(cmd, operands, stmt);
- }
-
- private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
- // parse statement via regex matching
- for (SqlCommand cmd : SqlCommand.values()) {
- if (cmd.pattern != null) {
- final Matcher matcher = cmd.pattern.matcher(stmt);
- if (matcher.matches()) {
- final String[] groups = new String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
- }
- return cmd.operandConverter.apply(groups)
- .map((operands) -> {
- String[] newOperands = operands;
- if (cmd == SqlCommand.EXPLAIN) {
- // convert `explain xx` to `explain plan for xx`
- // which can execute through executeSql method
- newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
- }
- return new SqlCommandCall(cmd, newOperands, stmt);
- });
- }
- }
- }
- return Optional.empty();
- }
-
- @Override
- public void executeSql(Object tableEnv, String sql) {
- ((TableEnvironment) tableEnv).executeSql(sql);
- }
-
- @Override
- public String explain(Object tableEnv, String sql) {
- TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
- return tableResult.collect().next().getField(0).toString();
- }
-
- @Override
- public String sqlHelp() {
- return MESSAGE_HELP.toString();
- }
-
- /**
* Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
* This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
* @param catalogManager
@@ -468,36 +231,6 @@ public class Flink113Shims extends FlinkShims {
}
@Override
- public Map extractTableConfigOptions() {
- Map<String, ConfigOption> configOptions = new HashMap<>();
- configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
- configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
- try {
- configOptions.putAll(extractConfigOptions(PythonOptions.class));
- } catch (NoClassDefFoundError e) {
- LOGGER.warn("No pyflink jars found");
- }
- configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
- return configOptions;
- }
-
- private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
- Map<String, ConfigOption> configOptions = new HashMap();
- Field[] fields = clazz.getDeclaredFields();
- for (Field field : fields) {
- if (field.getType().isAssignableFrom(ConfigOption.class)) {
- try {
- ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
- configOptions.put(configOption.key(), configOption);
- } catch (Throwable e) {
- LOGGER.warn("Fail to get ConfigOption", e);
- }
- }
- }
- return configOptions;
- }
-
- @Override
public void setBatchRuntimeMode(Object tableConfig) {
((TableConfig) tableConfig).getConfiguration()
.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
@@ -558,4 +291,12 @@ public class Flink113Shims extends FlinkShims {
(CatalogManager) catalogManager);
return ImmutablePair.of(planner, executor);
}
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+ if (isBatch) {
+ return batchSqlInterpreter.runSqlList(st, context);
+ } else {
+ return streamSqlInterpreter.runSqlList(st, context);
+ }
+ }
}
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
new file mode 100644
index 0000000..e3fe741
--- /dev/null
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class Flink113SqlInterpreter {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(Flink113SqlInterpreter.class);
+
+ private static final AttributedString MESSAGE_HELP =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(
+ formatCommand(
+ "CREATE TABLE",
+ "Create table under current catalog and database."))
+ .append(
+ formatCommand(
+ "DROP TABLE",
+ "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
+ .append(
+ formatCommand(
+ "CREATE VIEW",
+ "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
+ .append(
+ formatCommand(
+ "DESCRIBE",
+ "Describes the schema of a table with the given name."))
+ .append(
+ formatCommand(
+ "DROP VIEW",
+ "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
+ .append(
+ formatCommand(
+ "EXPLAIN",
+ "Describes the execution plan of a query or table with the given name."))
+ .append(formatCommand("HELP", "Prints the available commands."))
+ .append(
+ formatCommand(
+ "INSERT INTO",
+ "Inserts the results of a SQL SELECT query into a declared table sink."))
+ .append(
+ formatCommand(
+ "INSERT OVERWRITE",
+ "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
+ .append(
+ formatCommand(
+ "SELECT", "Executes a SQL SELECT query on the Flink cluster."))
+ .append(
+ formatCommand(
+ "SET",
+ "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
+ .append(
+ formatCommand(
+ "SHOW FUNCTIONS",
+ "Shows all user-defined and built-in functions or only user-defined functions. Syntax: 'SHOW [USER] FUNCTIONS;'"))
+ .append(formatCommand("SHOW TABLES", "Shows all registered tables."))
+ .append(
+ formatCommand(
+ "USE CATALOG",
+ "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
+ .append(
+ formatCommand(
+ "USE",
+ "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
+ .append(
+ formatCommand(
+ "LOAD MODULE",
+ "Load a module. Syntax: 'LOAD MODULE <name> [WITH ('<key1>' = "
+ + "'<value1>' [, '<key2>' = '<value2>', ...])];'"))
+ .append(
+ formatCommand(
+ "UNLOAD MODULE",
+ "Unload a module. Syntax: 'UNLOAD MODULE <name>;'"))
+ .append(
+ formatCommand(
+ "USE MODULES",
+ "Enable loaded modules. Syntax: 'USE MODULES <name1> [, <name2>, ...];'"))
+ .append(
+ formatCommand(
+ "BEGIN STATEMENT SET",
+ "Begins a statement set. Syntax: 'BEGIN STATEMENT SET;'"))
+ .append(formatCommand("END", "Ends a statement set. Syntax: 'END;'"))
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
+ .toAttributedString();
+
+ private static AttributedString formatCommand(String cmd, String description) {
+ return new AttributedStringBuilder()
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(cmd)
+ .append("\t\t")
+ .style(AttributedStyle.DEFAULT)
+ .append(description)
+ .append('\n')
+ .toAttributedString();
+ }
+
+ private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+ private FlinkSqlContext flinkSqlContext;
+ private TableEnvironment tbenv;
+ private ZeppelinContext z;
+ private Parser sqlParser;
+ private SqlSplitter sqlSplitter;
+ // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
+ private Map<String, Boolean> statementModeMap = new HashMap<>();
+ private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+ private boolean isBatch;
+ private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+ public Flink113SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+ this.flinkSqlContext = flinkSqlContext;
+ this.isBatch = isBatch;
+ if (isBatch) {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+ } else {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+ }
+ this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+ this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+ this.sqlSplitter = new SqlSplitter();
+ JobListener jobListener = new JobListener() {
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ LOGGER.info("UnLock JobSubmitLock");
+ }
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+ }
+ };
+
+ ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+ ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+ }
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ try {
+ boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+ statementModeMap.put(context.getParagraphId(), runAsOne);
+ String jobName = context.getLocalProperties().get("jobName");
+ if (StringUtils.isNotBlank(jobName)) {
+ tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+ }
+
+ List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+ for (String sql : sqls) {
+ List<Operation> operations = null;
+ try {
+ operations = sqlParser.parse(sql);
+ } catch (SqlParserException e) {
+ context.out.write("%text Invalid Sql statement: " + sql + "\n");
+ context.out.write(MESSAGE_HELP.toString());
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+ }
+
+ try {
+ callOperation(sql, operations.get(0), context);
+ context.out.flush();
+ } catch (Throwable e) {
+ LOGGER.error("Fail to run sql:" + sql, e);
+ try {
+ context.out.write("%text Fail to run sql command: " +
+ sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+ } catch (IOException ex) {
+ LOGGER.warn("Unexpected exception:", ex);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ ExceptionUtils.getStackTrace(e));
+ }
+ return new InterpreterResult(InterpreterResult.Code.ERROR);
+ }
+ }
+
+ if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+ try {
+ lock.lock();
+ List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ if (!modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql as one job", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ statementOperationsMap.remove(context.getParagraphId());
+ statementModeMap.remove(context.getParagraphId());
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ }
+
+ private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+ if (operation instanceof HelpOperation) {
+ callHelp(context);
+ } else if (operation instanceof SetOperation) {
+ callSet((SetOperation) operation, context);
+ } else if (operation instanceof CatalogSinkModifyOperation) {
+ // INSERT INTO/OVERWRITE
+ callInsert((CatalogSinkModifyOperation) operation, context);
+ } else if (operation instanceof QueryOperation) {
+ // SELECT
+ callSelect(sql, (QueryOperation) operation, context);
+ } else if (operation instanceof ExplainOperation) {
+ callExplain((ExplainOperation) operation, context);
+ } else if (operation instanceof BeginStatementSetOperation) {
+ callBeginStatementSet(context);
+ } else if (operation instanceof EndStatementSetOperation) {
+ callEndStatementSet(context);
+ } else if (operation instanceof ShowCatalogsOperation) {
+ callShowCatalogs(context);
+ } else if (operation instanceof ShowCurrentCatalogOperation) {
+ callShowCurrentCatalog(context);
+ } else if (operation instanceof UseCatalogOperation) {
+ callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+ } else if (operation instanceof CreateCatalogOperation) {
+ callDDL(sql, context, "Catalog has been created.");
+ } else if (operation instanceof DropCatalogOperation) {
+ callDDL(sql, context, "Catalog has been dropped.");
+ } else if (operation instanceof UseDatabaseOperation) {
+ UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+ callUseDatabase(useDBOperation.getDatabaseName(), context);
+ } else if (operation instanceof CreateDatabaseOperation) {
+ callDDL(sql, context, "Database has been created.");
+ } else if (operation instanceof DropDatabaseOperation) {
+ callDDL(sql, context, "Database has been removed.");
+ } else if (operation instanceof AlterDatabaseOperation) {
+ callDDL(sql, context, "Alter database succeeded!");
+ } else if (operation instanceof ShowDatabasesOperation) {
+ callShowDatabases(context);
+ } else if (operation instanceof ShowCurrentDatabaseOperation) {
+ callShowCurrentDatabase(context);
+ } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+ callDDL(sql, context, "Table has been created.");
+ } else if (operation instanceof AlterTableOperation) {
+ callDDL(sql, context, "Alter table succeeded!");
+ } else if (operation instanceof DropTableOperation) {
+ callDDL(sql, context, "Table has been dropped.");
+ } else if (operation instanceof DescribeTableOperation) {
+ DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+ callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+ } else if (operation instanceof ShowTablesOperation) {
+ callShowTables(context);
+ } else if (operation instanceof CreateViewOperation) {
+ callDDL(sql, context, "View has been created.");
+ } else if (operation instanceof DropViewOperation) {
+ callDDL(sql, context, "View has been dropped.");
+ } else if (operation instanceof AlterViewOperation) {
+ callDDL(sql, context, "Alter view succeeded!");
+ } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been created.");
+ } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been removed.");
+ } else if (operation instanceof AlterCatalogFunctionOperation) {
+ callDDL(sql, context, "Alter function succeeded!");
+ } else if (operation instanceof ShowFunctionsOperation) {
+ callShowFunctions(context);
+ } else if (operation instanceof ShowModulesOperation) {
+ callShowModules(context);
+ } else if (operation instanceof ShowPartitionsOperation) {
+ ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+ callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+ } else {
+ throw new IOException(operation.getClass().getName() + " is not supported");
+ }
+ }
+
+ private void callHelp(InterpreterContext context) throws IOException {
+ context.out.write(MESSAGE_HELP.toString());
+ }
+
+ private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
+ if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ modifyOperations.add(operation);
+ statementOperationsMap.put(context.getParagraphId(), modifyOperations);
+ } else {
+ callInserts(Collections.singletonList(operation), context);
+ }
+ }
+
+ private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+ if (!isBatch) {
+ context.getLocalProperties().put("flink.streaming.insert_into", "true");
+ }
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+ checkState(tableResult.getJobClient().isPresent());
+ try {
+ tableResult.await();
+ JobClient jobClient = tableResult.getJobClient().get();
+ if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+ context.out.write("Insertion successfully.\n");
+ } else {
+ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Flink job is interrupted", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Flink job is failed", e);
+ }
+ }
+
+ private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+ String explanation =
+ Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+ context.out.write(explanation + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ if (isBatch) {
+ callBatchInnerSelect(sql, context);
+ } else {
+ callStreamInnerSelect(sql, context);
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+ Table table = this.tbenv.sqlQuery(sql);
+ String result = z.showData(table);
+ context.out.write(result);
+ }
+
+ public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+ flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+ }
+
+ public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+ if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+ // set a property
+ String key = setOperation.getKey().get().trim();
+ String value = setOperation.getValue().get().trim();
+ this.tbenv.getConfig().getConfiguration().setString(key, value);
+ LOGGER.info("Set table config: {}={}", key, value);
+ } else {
+ // show all properties
+ final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+ List<String> prettyEntries = new ArrayList<>();
+ for (String key : properties.keySet()) {
+ prettyEntries.add(
+ String.format(
+ "'%s' = '%s'",
+ EncodingUtils.escapeSingleQuotes(key),
+ EncodingUtils.escapeSingleQuotes(properties.get(key))));
+ }
+ prettyEntries.sort(String::compareTo);
+ prettyEntries.forEach(entry -> {
+ try {
+ context.out.write(entry + "\n");
+ } catch (IOException e) {
+ LOGGER.warn("Fail to write output", e);
+ }
+ });
+ }
+ }
+
+ private void callBeginStatementSet(InterpreterContext context) throws IOException {
+ statementModeMap.put(context.getParagraphId(), true);
+ }
+
+ private void callEndStatementSet(InterpreterContext context) throws IOException {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+ if (modifyOperations != null && !modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ } else {
+ context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+ }
+ statementModeMap.remove(context.getParagraphId());
+ }
+
+ private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+ tbenv.executeSql("USE CATALOG `" + catalog + "`");
+ }
+
+ private void callUseDatabase(String databaseName,
+ InterpreterContext context) throws IOException {
+ this.tbenv.executeSql("USE `" + databaseName + "`");
+ }
+
+ private void callShowCatalogs(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+ List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+ }
+
+ private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+ String catalog = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current catalog: " + catalog + "\n");
+ }
+
+ private void callShowDatabases(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+ List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+ }
+
+ private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+ String database = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current database: " + database + "\n");
+ }
+
+ private void callShowTables(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+ List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+ }
+
+ private void callShowFunctions(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+ List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+ }
+
+ private void callShowModules(InterpreterContext context) throws IOException {
+ String[] modules = this.tbenv.listModules();
+ context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+ }
+
+ private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql(sql);
+ List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+ }
+
+ private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+ try {
+ lock.lock();
+ this.tbenv.executeSql(sql);
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ context.out.write(message + "\n");
+ }
+
+ private void callDescribe(String name, InterpreterContext context) throws IOException {
+ TableResult tableResult = null;
+ try {
+ tableResult = tbenv.executeSql("DESCRIBE " + name);
+ } catch (Exception e) {
+ throw new IOException("Fail to describe table: " + name, e);
+ }
+ CloseableIterator<Row> result = tableResult.collect();
+ StringBuilder builder = new StringBuilder();
+ builder.append("Column\tType\n");
+ while (result.hasNext()) {
+ Row row = result.next();
+ builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+ }
+ context.out.write("%table\n" + builder);
+ }
+}
diff --git a/flink/flink1.14-shims/pom.xml b/flink/flink1.14-shims/pom.xml
index c160548..aa6e1e4 100644
--- a/flink/flink1.14-shims/pom.xml
+++ b/flink/flink1.14-shims/pom.xml
@@ -116,6 +116,13 @@
<dependency>
<groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${flink.scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
index 56fdc7e..e23ac54 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114Shims.java
@@ -22,41 +22,26 @@ import org.apache.commons.cli.CommandLine;
import org.apache.commons.compress.utils.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.python.PythonOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.PlannerType;
-import org.apache.flink.table.api.StatementSet;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.TableException;
-import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
-import org.apache.flink.table.api.config.ExecutionConfigOptions;
-import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.table.api.config.TableConfigOptions;
-import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
-import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
@@ -64,32 +49,6 @@ import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
-import org.apache.flink.table.operations.CatalogSinkModifyOperation;
-import org.apache.flink.table.operations.DescribeTableOperation;
-import org.apache.flink.table.operations.ExplainOperation;
-import org.apache.flink.table.operations.Operation;
-import org.apache.flink.table.operations.QueryOperation;
-import org.apache.flink.table.operations.ShowCatalogsOperation;
-import org.apache.flink.table.operations.ShowDatabasesOperation;
-import org.apache.flink.table.operations.ShowFunctionsOperation;
-import org.apache.flink.table.operations.ShowTablesOperation;
-import org.apache.flink.table.operations.UseCatalogOperation;
-import org.apache.flink.table.operations.UseDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
-import org.apache.flink.table.operations.ddl.AlterTableOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
-import org.apache.flink.table.operations.ddl.CreateDatabaseOperation;
-import org.apache.flink.table.operations.ddl.CreateTableOperation;
-import org.apache.flink.table.operations.ddl.CreateTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.CreateViewOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropCatalogOperation;
-import org.apache.flink.table.operations.ddl.DropDatabaseOperation;
-import org.apache.flink.table.operations.ddl.DropTableOperation;
-import org.apache.flink.table.operations.ddl.DropTempSystemFunctionOperation;
-import org.apache.flink.table.operations.ddl.DropViewOperation;
import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.utils.PrintUtils;
@@ -97,30 +56,19 @@ import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkException;
import org.apache.zeppelin.flink.shims114.CollectStreamTableSink;
-import org.apache.zeppelin.flink.sql.SqlCommandParser;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommand;
-import org.apache.zeppelin.flink.sql.SqlCommandParser.SqlCommandCall;
import org.apache.zeppelin.interpreter.InterpreterContext;
-import org.jline.utils.AttributedString;
-import org.jline.utils.AttributedStringBuilder;
-import org.jline.utils.AttributedStyle;
+import org.apache.zeppelin.interpreter.InterpreterResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.time.ZoneId;
import java.util.Arrays;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.regex.Matcher;
/**
@@ -129,36 +77,22 @@ import java.util.regex.Matcher;
public class Flink114Shims extends FlinkShims {
private static final Logger LOGGER = LoggerFactory.getLogger(Flink114Shims.class);
- public static final AttributedString MESSAGE_HELP = new AttributedStringBuilder()
- .append("The following commands are available:\n\n")
- .append(formatCommand(SqlCommand.CREATE_TABLE, "Create table under current catalog and database."))
- .append(formatCommand(SqlCommand.DROP_TABLE, "Drop table with optional catalog and database. Syntax: 'DROP TABLE [IF EXISTS] <name>;'"))
- .append(formatCommand(SqlCommand.CREATE_VIEW, "Creates a virtual table from a SQL query. Syntax: 'CREATE VIEW <name> AS <query>;'"))
- .append(formatCommand(SqlCommand.DESCRIBE, "Describes the schema of a table with the given name."))
- .append(formatCommand(SqlCommand.DROP_VIEW, "Deletes a previously created virtual table. Syntax: 'DROP VIEW <name>;'"))
- .append(formatCommand(SqlCommand.EXPLAIN, "Describes the execution plan of a query or table with the given name."))
- .append(formatCommand(SqlCommand.HELP, "Prints the available commands."))
- .append(formatCommand(SqlCommand.INSERT_INTO, "Inserts the results of a SQL SELECT query into a declared table sink."))
- .append(formatCommand(SqlCommand.INSERT_OVERWRITE, "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data."))
- .append(formatCommand(SqlCommand.SELECT, "Executes a SQL SELECT query on the Flink cluster."))
- .append(formatCommand(SqlCommand.SET, "Sets a session configuration property. Syntax: 'SET <key>=<value>;'. Use 'SET;' for listing all properties."))
- .append(formatCommand(SqlCommand.SHOW_FUNCTIONS, "Shows all user-defined and built-in functions."))
- .append(formatCommand(SqlCommand.SHOW_TABLES, "Shows all registered tables."))
- .append(formatCommand(SqlCommand.SOURCE, "Reads a SQL SELECT query from a file and executes it on the Flink cluster."))
- .append(formatCommand(SqlCommand.USE_CATALOG, "Sets the current catalog. The current database is set to the catalog's default one. Experimental! Syntax: 'USE CATALOG <name>;'"))
- .append(formatCommand(SqlCommand.USE, "Sets the current default database. Experimental! Syntax: 'USE <name>;'"))
- .style(AttributedStyle.DEFAULT.underline())
- .append("\nHint")
- .style(AttributedStyle.DEFAULT)
- .append(": Make sure that a statement ends with ';' for finalizing (multi-line) statements.")
- .toAttributedString();
-
- private Map<String, StatementSet> statementSetMap = new ConcurrentHashMap<>();
+
+ private Flink114SqlInterpreter batchSqlInterpreter;
+ private Flink114SqlInterpreter streamSqlInterpreter;
public Flink114Shims(FlinkVersion flinkVersion, Properties properties) {
super(flinkVersion, properties);
}
+ public void initInnerBatchSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.batchSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, true);
+ }
+
+ public void initInnerStreamSqlInterpreter(FlinkSqlContext flinkSqlContext) {
+ this.streamSqlInterpreter = new Flink114SqlInterpreter(flinkSqlContext, false);
+ }
+
@Override
public void disableSysoutLogging(Object batchConfig, Object streamConfig) {
// do nothing
@@ -224,31 +158,6 @@ public class Flink114Shims extends FlinkShims {
}
@Override
- public void startMultipleInsert(Object tblEnv, InterpreterContext context) throws Exception {
- StatementSet statementSet = ((TableEnvironment) tblEnv).createStatementSet();
- statementSetMap.put(context.getParagraphId(), statementSet);
- }
-
- @Override
- public void addInsertStatement(String sql, Object tblEnv, InterpreterContext context) throws Exception {
- statementSetMap.get(context.getParagraphId()).addInsertSql(sql);
- }
-
- @Override
- public boolean executeMultipleInsertInto(String jobName, Object tblEnv, InterpreterContext context) throws Exception {
- JobClient jobClient = statementSetMap.get(context.getParagraphId()).execute().getJobClient().get();
- while (!jobClient.getJobStatus().get().isTerminalState()) {
- LOGGER.debug("Wait for job to finish");
- Thread.sleep(1000 * 5);
- }
- if (jobClient.getJobStatus().get() == JobStatus.CANCELED) {
- context.out.write("Job is cancelled.\n");
- return false;
- }
- return true;
- }
-
- @Override
public boolean rowEquals(Object row1, Object row2) {
Row r1 = (Row) row1;
Row r2 = (Row) row2;
@@ -296,151 +205,6 @@ public class Flink114Shims extends FlinkShims {
}
/**
- * Parse it via flink SqlParser first, then fallback to regular expression matching.
- *
- * @param tableEnv
- * @param stmt
- * @return
- */
- @Override
- public Optional<SqlCommandParser.SqlCommandCall> parseSql(Object tableEnv, String stmt) {
- Parser sqlParser = ((TableEnvironmentInternal) tableEnv).getParser();
- SqlCommandCall sqlCommandCall = null;
- try {
- // parse statement via regex matching first
- Optional<SqlCommandCall> callOpt = parseByRegexMatching(stmt);
- if (callOpt.isPresent()) {
- sqlCommandCall = callOpt.get();
- } else {
- sqlCommandCall = parseBySqlParser(sqlParser, stmt);
- }
- } catch (Exception e) {
- return Optional.empty();
- }
- return Optional.of(sqlCommandCall);
-
- }
-
- private SqlCommandCall parseBySqlParser(Parser sqlParser, String stmt) throws Exception {
- List<Operation> operations;
- try {
- operations = sqlParser.parse(stmt);
- } catch (Throwable e) {
- throw new Exception("Invalidate SQL statement.", e);
- }
- if (operations.size() != 1) {
- throw new Exception("Only single statement is supported now.");
- }
-
- final SqlCommand cmd;
- String[] operands = new String[]{stmt};
- Operation operation = operations.get(0);
- if (operation instanceof CatalogSinkModifyOperation) {
- boolean overwrite = ((CatalogSinkModifyOperation) operation).isOverwrite();
- cmd = overwrite ? SqlCommand.INSERT_OVERWRITE : SqlCommand.INSERT_INTO;
- } else if (operation instanceof CreateTableOperation) {
- cmd = SqlCommand.CREATE_TABLE;
- } else if (operation instanceof DropTableOperation) {
- cmd = SqlCommand.DROP_TABLE;
- } else if (operation instanceof AlterTableOperation) {
- cmd = SqlCommand.ALTER_TABLE;
- } else if (operation instanceof CreateViewOperation) {
- cmd = SqlCommand.CREATE_VIEW;
- } else if (operation instanceof DropViewOperation) {
- cmd = SqlCommand.DROP_VIEW;
- } else if (operation instanceof CreateDatabaseOperation) {
- cmd = SqlCommand.CREATE_DATABASE;
- } else if (operation instanceof DropDatabaseOperation) {
- cmd = SqlCommand.DROP_DATABASE;
- } else if (operation instanceof AlterDatabaseOperation) {
- cmd = SqlCommand.ALTER_DATABASE;
- } else if (operation instanceof CreateCatalogOperation) {
- cmd = SqlCommand.CREATE_CATALOG;
- } else if (operation instanceof DropCatalogOperation) {
- cmd = SqlCommand.DROP_CATALOG;
- } else if (operation instanceof UseCatalogOperation) {
- cmd = SqlCommand.USE_CATALOG;
- operands = new String[]{((UseCatalogOperation) operation).getCatalogName()};
- } else if (operation instanceof UseDatabaseOperation) {
- cmd = SqlCommand.USE;
- operands = new String[]{((UseDatabaseOperation) operation).getDatabaseName()};
- } else if (operation instanceof ShowCatalogsOperation) {
- cmd = SqlCommand.SHOW_CATALOGS;
- operands = new String[0];
- } else if (operation instanceof ShowDatabasesOperation) {
- cmd = SqlCommand.SHOW_DATABASES;
- operands = new String[0];
- } else if (operation instanceof ShowTablesOperation) {
- cmd = SqlCommand.SHOW_TABLES;
- operands = new String[0];
- } else if (operation instanceof ShowFunctionsOperation) {
- cmd = SqlCommand.SHOW_FUNCTIONS;
- operands = new String[0];
- } else if (operation instanceof CreateCatalogFunctionOperation ||
- operation instanceof CreateTempSystemFunctionOperation) {
- cmd = SqlCommand.CREATE_FUNCTION;
- } else if (operation instanceof DropCatalogFunctionOperation ||
- operation instanceof DropTempSystemFunctionOperation) {
- cmd = SqlCommand.DROP_FUNCTION;
- } else if (operation instanceof AlterCatalogFunctionOperation) {
- cmd = SqlCommand.ALTER_FUNCTION;
- } else if (operation instanceof ExplainOperation) {
- cmd = SqlCommand.EXPLAIN;
- } else if (operation instanceof DescribeTableOperation) {
- cmd = SqlCommand.DESCRIBE;
- operands = new String[]{((DescribeTableOperation) operation).getSqlIdentifier().asSerializableString()};
- } else if (operation instanceof QueryOperation) {
- cmd = SqlCommand.SELECT;
- } else {
- throw new Exception("Unknown operation: " + operation.asSummaryString());
- }
-
- return new SqlCommandCall(cmd, operands, stmt);
- }
-
- private static Optional<SqlCommandCall> parseByRegexMatching(String stmt) {
- // parse statement via regex matching
- for (SqlCommand cmd : SqlCommand.values()) {
- if (cmd.pattern != null) {
- final Matcher matcher = cmd.pattern.matcher(stmt);
- if (matcher.matches()) {
- final String[] groups = new String[matcher.groupCount()];
- for (int i = 0; i < groups.length; i++) {
- groups[i] = matcher.group(i + 1);
- }
- return cmd.operandConverter.apply(groups)
- .map((operands) -> {
- String[] newOperands = operands;
- if (cmd == SqlCommand.EXPLAIN) {
- // convert `explain xx` to `explain plan for xx`
- // which can execute through executeSql method
- newOperands = new String[]{"EXPLAIN PLAN FOR " + operands[0] + " " + operands[1]};
- }
- return new SqlCommandCall(cmd, newOperands, stmt);
- });
- }
- }
- }
- return Optional.empty();
- }
-
- @Override
- public void executeSql(Object tableEnv, String sql) {
- ((TableEnvironment) tableEnv).executeSql(sql);
- }
-
- @Override
- public String explain(Object tableEnv, String sql) {
- TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
- return tableResult.collect().next().getField(0).toString();
- }
-
- @Override
- public String sqlHelp() {
- return MESSAGE_HELP.toString();
- }
-
- /**
* Flink 1.11 bind CatalogManager with parser which make blink and flink could not share the same CatalogManager.
* This is a workaround which always reset CatalogTableSchemaResolver before running any flink code.
* @param catalogManager
@@ -466,36 +230,6 @@ public class Flink114Shims extends FlinkShims {
}
@Override
- public Map extractTableConfigOptions() {
- Map<String, ConfigOption> configOptions = new HashMap<>();
- configOptions.putAll(extractConfigOptions(ExecutionConfigOptions.class));
- configOptions.putAll(extractConfigOptions(OptimizerConfigOptions.class));
- try {
- configOptions.putAll(extractConfigOptions(PythonOptions.class));
- } catch (NoClassDefFoundError e) {
- LOGGER.warn("No pyflink jars found");
- }
- configOptions.putAll(extractConfigOptions(TableConfigOptions.class));
- return configOptions;
- }
-
- private Map<String, ConfigOption> extractConfigOptions(Class clazz) {
- Map<String, ConfigOption> configOptions = new HashMap();
- Field[] fields = clazz.getDeclaredFields();
- for (Field field : fields) {
- if (field.getType().isAssignableFrom(ConfigOption.class)) {
- try {
- ConfigOption configOption = (ConfigOption) field.get(ConfigOption.class);
- configOptions.put(configOption.key(), configOption);
- } catch (Throwable e) {
- LOGGER.warn("Fail to get ConfigOption", e);
- }
- }
- }
- return configOptions;
- }
-
- @Override
public void setBatchRuntimeMode(Object tableConfig) {
((TableConfig) tableConfig).getConfiguration()
.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
@@ -556,4 +290,12 @@ public class Flink114Shims extends FlinkShims {
(FunctionCatalog) functionCatalog);
return ImmutablePair.of(planner, executor);
}
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context, boolean isBatch) {
+ if (isBatch) {
+ return batchSqlInterpreter.runSqlList(st, context);
+ } else {
+ return streamSqlInterpreter.runSqlList(st, context);
+ }
+ }
}
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
new file mode 100644
index 0000000..f155fcc
--- /dev/null
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.flink;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.SqlParserException;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.*;
+import org.apache.flink.table.operations.command.HelpOperation;
+import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.ddl.*;
+import org.apache.flink.table.utils.EncodingUtils;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Preconditions;
+import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.ZeppelinContext;
+import org.apache.zeppelin.interpreter.util.SqlSplitter;
+import org.jline.utils.AttributedString;
+import org.jline.utils.AttributedStringBuilder;
+import org.jline.utils.AttributedStyle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+public class Flink114SqlInterpreter {
+
+ protected static final Logger LOGGER = LoggerFactory.getLogger(Flink114SqlInterpreter.class);
+
+ private static final String CMD_DESC_DELIMITER = "\t\t";
+
+ /**
+ * SQL Client HELP command helper class.
+ */
+ private static final class SQLCliCommandsDescriptions {
+ private int commandMaxLength;
+ private final Map<String, String> commandsDescriptions;
+
+ public SQLCliCommandsDescriptions() {
+ this.commandsDescriptions = new LinkedHashMap<>();
+ this.commandMaxLength = -1;
+ }
+
+ public SQLCliCommandsDescriptions commandDescription(String command, String description) {
+ Preconditions.checkState(
+ StringUtils.isNotBlank(command), "content of command must not be empty.");
+ Preconditions.checkState(
+ StringUtils.isNotBlank(description),
+ "content of command's description must not be empty.");
+ this.updateMaxCommandLength(command.length());
+ this.commandsDescriptions.put(command, description);
+ return this;
+ }
+
+ private void updateMaxCommandLength(int newLength) {
+ Preconditions.checkState(newLength > 0);
+ if (this.commandMaxLength < newLength) {
+ this.commandMaxLength = newLength;
+ }
+ }
+
+ public AttributedString build() {
+ AttributedStringBuilder attributedStringBuilder = new AttributedStringBuilder();
+ if (!this.commandsDescriptions.isEmpty()) {
+ this.commandsDescriptions.forEach(
+ (cmd, cmdDesc) -> {
+ attributedStringBuilder
+ .style(AttributedStyle.DEFAULT.bold())
+ .append(
+ String.format(
+ String.format("%%-%ds", commandMaxLength), cmd))
+ .append(CMD_DESC_DELIMITER)
+ .style(AttributedStyle.DEFAULT)
+ .append(cmdDesc)
+ .append('\n');
+ });
+ }
+ return attributedStringBuilder.toAttributedString();
+ }
+ }
+
+ private static final AttributedString SQL_CLI_COMMANDS_DESCRIPTIONS =
+ new SQLCliCommandsDescriptions()
+ .commandDescription("HELP", "Prints the available commands.")
+ .commandDescription(
+ "SET",
+ "Sets a session configuration property. Syntax: \"SET '<key>'='<value>';\". Use \"SET;\" for listing all properties.")
+ .commandDescription(
+ "RESET",
+ "Resets a session configuration property. Syntax: \"RESET '<key>';\". Use \"RESET;\" for reset all session properties.")
+ .commandDescription(
+ "INSERT INTO",
+ "Inserts the results of a SQL SELECT query into a declared table sink.")
+ .commandDescription(
+ "INSERT OVERWRITE",
+ "Inserts the results of a SQL SELECT query into a declared table sink and overwrite existing data.")
+ .commandDescription(
+ "SELECT", "Executes a SQL SELECT query on the Flink cluster.")
+ .commandDescription(
+ "EXPLAIN",
+ "Describes the execution plan of a query or table with the given name.")
+ .commandDescription(
+ "BEGIN STATEMENT SET",
+ "Begins a statement set. Syntax: \"BEGIN STATEMENT SET;\"")
+ .commandDescription("END", "Ends a statement set. Syntax: \"END;\"")
+ // (TODO) zjffdu, ADD/REMOVE/SHOW JAR
+ .build();
+
+ // --------------------------------------------------------------------------------------------
+
+ public static final AttributedString MESSAGE_HELP =
+ new AttributedStringBuilder()
+ .append("The following commands are available:\n\n")
+ .append(SQL_CLI_COMMANDS_DESCRIPTIONS)
+ .style(AttributedStyle.DEFAULT.underline())
+ .append("\nHint")
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ ": Make sure that a statement ends with \";\" for finalizing (multi-line) statements.")
+ // About Documentation Link.
+ .style(AttributedStyle.DEFAULT)
+ .append(
+ "\nYou can also type any Flink SQL statement, please visit https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/overview/ for more details.")
+ .toAttributedString();
+
+ private static final String MESSAGE_NO_STATEMENT_IN_STATEMENT_SET = "No statement in the statement set, skip submit.";
+
+ private FlinkSqlContext flinkSqlContext;
+ private TableEnvironment tbenv;
+ private ZeppelinContext z;
+ private Parser sqlParser;
+ private SqlSplitter sqlSplitter;
+ // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
+ private Map<String, Boolean> statementModeMap = new HashMap<>();
+ private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
+ private boolean isBatch;
+ private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
+
+
+ public Flink114SqlInterpreter(FlinkSqlContext flinkSqlContext, boolean isBatch) {
+ this.flinkSqlContext = flinkSqlContext;
+ this.isBatch = isBatch;
+ if (isBatch) {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getBtenv();
+ } else {
+ this.tbenv = (TableEnvironment) flinkSqlContext.getStenv();
+ }
+ this.z = (ZeppelinContext) flinkSqlContext.getZeppelinContext();
+ this.sqlParser = ((TableEnvironmentInternal) tbenv).getParser();
+ this.sqlSplitter = new SqlSplitter();
+ JobListener jobListener = new JobListener() {
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ LOGGER.info("UnLock JobSubmitLock");
+ }
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+ }
+ };
+
+ ((ExecutionEnvironment) flinkSqlContext.getBenv()).registerJobListener(jobListener);
+ ((StreamExecutionEnvironment) flinkSqlContext.getSenv()).registerJobListener(jobListener);
+ }
+
+ public InterpreterResult runSqlList(String st, InterpreterContext context) {
+ try {
+ boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
+ statementModeMap.put(context.getParagraphId(), runAsOne);
+ String jobName = context.getLocalProperties().get("jobName");
+ if (StringUtils.isNotBlank(jobName)) {
+ tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
+ }
+
+ List<String> sqls = sqlSplitter.splitSql(st).stream().map(String::trim).collect(Collectors.toList());
+ for (String sql : sqls) {
+ List<Operation> operations = null;
+ try {
+ operations = sqlParser.parse(sql);
+ } catch (SqlParserException e) {
+ context.out.write("%text Invalid Sql statement: " + sql + "\n");
+ context.out.write(MESSAGE_HELP.toString());
+ return new InterpreterResult(InterpreterResult.Code.ERROR, e.toString());
+ }
+
+ try {
+ callOperation(sql, operations.get(0), context);
+ context.out.flush();
+ } catch (Throwable e) {
+ LOGGER.error("Fail to run sql:" + sql, e);
+ try {
+ context.out.write("%text Fail to run sql command: " +
+ sql + "\n" + ExceptionUtils.getStackTrace(e) + "\n");
+ } catch (IOException ex) {
+ LOGGER.warn("Unexpected exception:", ex);
+ return new InterpreterResult(InterpreterResult.Code.ERROR,
+ ExceptionUtils.getStackTrace(e));
+ }
+ return new InterpreterResult(InterpreterResult.Code.ERROR);
+ }
+ }
+
+ if (runAsOne && !statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>()).isEmpty()) {
+ try {
+ lock.lock();
+ List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ if (!modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql as one job", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("Fail to execute sql", e);
+ return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
+ } finally {
+ statementOperationsMap.remove(context.getParagraphId());
+ statementModeMap.remove(context.getParagraphId());
+ }
+
+ return new InterpreterResult(InterpreterResult.Code.SUCCESS);
+ }
+
+ private void callOperation(String sql, Operation operation, InterpreterContext context) throws IOException {
+ if (operation instanceof HelpOperation) {
+ // HELP
+ callHelp(context);
+ } else if (operation instanceof SetOperation) {
+ // SET
+ callSet((SetOperation) operation, context);
+ } else if (operation instanceof CatalogSinkModifyOperation) {
+ // INSERT INTO/OVERWRITE
+ callInsert((CatalogSinkModifyOperation) operation, context);
+ } else if (operation instanceof QueryOperation) {
+ // SELECT
+ callSelect(sql, (QueryOperation) operation, context);
+ } else if (operation instanceof ExplainOperation) {
+ // EXPLAIN
+ callExplain((ExplainOperation) operation, context);
+ } else if (operation instanceof BeginStatementSetOperation) {
+ // BEGIN STATEMENT SET
+ callBeginStatementSet(context);
+ } else if (operation instanceof EndStatementSetOperation) {
+ // END
+ callEndStatementSet(context);
+ } else if (operation instanceof ShowCreateTableOperation) {
+ // SHOW CREATE TABLE
+ callShowCreateTable((ShowCreateTableOperation) operation, context);
+ } else if (operation instanceof ShowCatalogsOperation) {
+ callShowCatalogs(context);
+ } else if (operation instanceof ShowCurrentCatalogOperation) {
+ callShowCurrentCatalog(context);
+ } else if (operation instanceof UseCatalogOperation) {
+ callUseCatalog(((UseCatalogOperation) operation).getCatalogName(), context);
+ } else if (operation instanceof CreateCatalogOperation) {
+ callDDL(sql, context, "Catalog has been created.");
+ } else if (operation instanceof DropCatalogOperation) {
+ callDDL(sql, context, "Catalog has been dropped.");
+ } else if (operation instanceof UseDatabaseOperation) {
+ UseDatabaseOperation useDBOperation = (UseDatabaseOperation) operation;
+ callUseDatabase(useDBOperation.getDatabaseName(), context);
+ } else if (operation instanceof CreateDatabaseOperation) {
+ callDDL(sql, context, "Database has been created.");
+ } else if (operation instanceof DropDatabaseOperation) {
+ callDDL(sql, context, "Database has been removed.");
+ } else if (operation instanceof AlterDatabaseOperation) {
+ callDDL(sql, context, "Alter database succeeded!");
+ } else if (operation instanceof ShowDatabasesOperation) {
+ callShowDatabases(context);
+ } else if (operation instanceof ShowCurrentDatabaseOperation) {
+ callShowCurrentDatabase(context);
+ } else if (operation instanceof CreateTableOperation || operation instanceof CreateTableASOperation) {
+ callDDL(sql, context, "Table has been created.");
+ } else if (operation instanceof AlterTableOperation) {
+ callDDL(sql, context, "Alter table succeeded!");
+ } else if (operation instanceof DropTableOperation) {
+ callDDL(sql, context, "Table has been dropped.");
+ } else if (operation instanceof DescribeTableOperation) {
+ DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
+ callDescribe(describeTableOperation.getSqlIdentifier().getObjectName(), context);
+ } else if (operation instanceof ShowTablesOperation) {
+ callShowTables(context);
+ } else if (operation instanceof CreateViewOperation) {
+ callDDL(sql, context, "View has been created.");
+ } else if (operation instanceof DropViewOperation) {
+ callDDL(sql, context, "View has been dropped.");
+ } else if (operation instanceof AlterViewOperation) {
+ callDDL(sql, context, "Alter view succeeded!");
+ } else if (operation instanceof CreateCatalogFunctionOperation || operation instanceof CreateTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been created.");
+ } else if (operation instanceof DropCatalogFunctionOperation || operation instanceof DropTempSystemFunctionOperation) {
+ callDDL(sql, context, "Function has been removed.");
+ } else if (operation instanceof AlterCatalogFunctionOperation) {
+ callDDL(sql, context, "Alter function succeeded!");
+ } else if (operation instanceof ShowFunctionsOperation) {
+ callShowFunctions(context);
+ } else if (operation instanceof ShowModulesOperation) {
+ callShowModules(context);
+ } else if (operation instanceof ShowPartitionsOperation) {
+ ShowPartitionsOperation showPartitionsOperation = (ShowPartitionsOperation) operation;
+ callShowPartitions(showPartitionsOperation.asSummaryString(), context);
+ } else {
+ throw new IOException(operation.getClass().getName() + " is not supported");
+ }
+ }
+
+
+ private void callHelp(InterpreterContext context) throws IOException {
+ context.out.write(MESSAGE_HELP.toString() + "\n");
+ }
+
+ private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
+ if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ modifyOperations.add(operation);
+ statementOperationsMap.put(context.getParagraphId(), modifyOperations);
+ } else {
+ callInserts(Collections.singletonList(operation), context);
+ }
+ }
+
+ private void callInserts(List<ModifyOperation> operations, InterpreterContext context) throws IOException {
+ if (!isBatch) {
+ context.getLocalProperties().put("flink.streaming.insert_into", "true");
+ }
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(operations);
+ checkState(tableResult.getJobClient().isPresent());
+ try {
+ tableResult.await();
+ JobClient jobClient = tableResult.getJobClient().get();
+ if (jobClient.getJobStatus().get() == JobStatus.FINISHED) {
+ context.out.write("Insertion successfully.\n");
+ } else {
+ throw new IOException("Job is failed, " + jobClient.getJobExecutionResult().get().toString());
+ }
+ } catch (InterruptedException e) {
+ throw new IOException("Flink job is interrupted", e);
+ } catch (ExecutionException e) {
+ throw new IOException("Flink job is failed", e);
+ }
+ }
+
+ private void callShowCreateTable(ShowCreateTableOperation showCreateTableOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(showCreateTableOperation);
+ String explanation =
+ Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+ context.out.write(explanation + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ private void callExplain(ExplainOperation explainOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ TableResult tableResult = ((TableEnvironmentInternal) tbenv).executeInternal(explainOperation);
+ String explanation =
+ Objects.requireNonNull(tableResult.collect().next().getField(0)).toString();
+ context.out.write(explanation + "\n");
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callSelect(String sql, QueryOperation queryOperation, InterpreterContext context) throws IOException {
+ try {
+ lock.lock();
+ if (isBatch) {
+ callBatchInnerSelect(sql, context);
+ } else {
+ callStreamInnerSelect(sql, context);
+ }
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ }
+
+ public void callBatchInnerSelect(String sql, InterpreterContext context) throws IOException {
+ Table table = this.tbenv.sqlQuery(sql);
+ String result = z.showData(table);
+ context.out.write(result);
+ }
+
+ public void callStreamInnerSelect(String sql, InterpreterContext context) throws IOException {
+ flinkSqlContext.getStreamSqlSelectConsumer().accept(sql);
+ }
+
+ public void callSet(SetOperation setOperation, InterpreterContext context) throws IOException {
+ if (setOperation.getKey().isPresent() && setOperation.getValue().isPresent()) {
+ // set a property
+ String key = setOperation.getKey().get().trim();
+ String value = setOperation.getValue().get().trim();
+ this.tbenv.getConfig().getConfiguration().setString(key, value);
+ LOGGER.info("Set table config: {}={}", key, value);
+ } else {
+ // show all properties
+ final Map<String, String> properties = this.tbenv.getConfig().getConfiguration().toMap();
+ List<String> prettyEntries = new ArrayList<>();
+ for (String key : properties.keySet()) {
+ prettyEntries.add(
+ String.format(
+ "'%s' = '%s'",
+ EncodingUtils.escapeSingleQuotes(key),
+ EncodingUtils.escapeSingleQuotes(properties.get(key))));
+ }
+ prettyEntries.sort(String::compareTo);
+ prettyEntries.forEach(entry -> {
+ try {
+ context.out.write(entry + "\n");
+ } catch (IOException e) {
+ LOGGER.warn("Fail to write output", e);
+ }
+ });
+ }
+ }
+
+ private void callBeginStatementSet(InterpreterContext context) throws IOException {
+ statementModeMap.put(context.getParagraphId(), true);
+ }
+
+ private void callEndStatementSet(InterpreterContext context) throws IOException {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
+ if (modifyOperations != null && !modifyOperations.isEmpty()) {
+ callInserts(modifyOperations, context);
+ } else {
+ context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
+ }
+ statementModeMap.remove(context.getParagraphId());
+ }
+
+ private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
+ tbenv.executeSql("USE CATALOG `" + catalog + "`");
+ }
+
+ private void callUseDatabase(String databaseName,
+ InterpreterContext context) throws IOException {
+ this.tbenv.executeSql("USE `" + databaseName + "`");
+ }
+
+ private void callShowCatalogs(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Catalogs");
+ List<String> catalogs = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write("%table catalog\n" + StringUtils.join(catalogs, "\n") + "\n");
+ }
+
+ private void callShowCurrentCatalog(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Catalog");
+ String catalog = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current catalog: " + catalog + "\n");
+ }
+
+ private void callShowDatabases(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Databases");
+ List<String> databases = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table database\n" + StringUtils.join(databases, "\n") + "\n");
+ }
+
+ private void callShowCurrentDatabase(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Current Database");
+ String database = tableResult.collect().next().getField(0).toString();
+ context.out.write("%text current database: " + database + "\n");
+ }
+
+ private void callShowTables(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Tables");
+ List<String> tables = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .filter(tbl -> !tbl.startsWith("UnnamedTable"))
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table table\n" + StringUtils.join(tables, "\n") + "\n");
+ }
+
+ private void callShowFunctions(InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql("SHOW Functions");
+ List<String> functions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table function\n" + StringUtils.join(functions, "\n") + "\n");
+ }
+
+ private void callShowModules(InterpreterContext context) throws IOException {
+ String[] modules = this.tbenv.listModules();
+ context.out.write("%table modules\n" + StringUtils.join(modules, "\n") + "\n");
+ }
+
+ private void callShowPartitions(String sql, InterpreterContext context) throws IOException {
+ TableResult tableResult = this.tbenv.executeSql(sql);
+ List<String> partions = CollectionUtil.iteratorToList(tableResult.collect()).stream()
+ .map(r -> checkNotNull(r.getField(0)).toString())
+ .collect(Collectors.toList());
+ context.out.write(
+ "%table partitions\n" + StringUtils.join(partions, "\n") + "\n");
+ }
+
+ private void callDDL(String sql, InterpreterContext context, String message) throws IOException {
+ try {
+ lock.lock();
+ this.tbenv.executeSql(sql);
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ lock.unlock();
+ }
+ }
+ context.out.write(message + "\n");
+ }
+
+ private void callDescribe(String name, InterpreterContext context) throws IOException {
+ TableResult tableResult = null;
+ try {
+ tableResult = tbenv.executeSql("DESCRIBE " + name);
+ } catch (Exception e) {
+ throw new IOException("Fail to describe table: " + name, e);
+ }
+ CloseableIterator<Row> result = tableResult.collect();
+ StringBuilder builder = new StringBuilder();
+ builder.append("Column\tType\n");
+ while (result.hasNext()) {
+ Row row = result.next();
+ builder.append(row.getField(0) + "\t" + row.getField(1) + "\n");
+ }
+ context.out.write("%table\n" + builder.toString());
+ }
+}