You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/31 04:14:36 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5020]. Unable
create temporary view in flink
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new db1a95b [ZEPPELIN-5020]. Unable create temporary view in flink
db1a95b is described below
commit db1a95bb42e9ab34e5b112da1e37f0d31b716fe0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Aug 27 19:28:26 2020 +0800
[ZEPPELIN-5020]. Unable create temporary view in flink
### What is this PR for?
This is due to api change in flink 1.11, this PR fix this issue and also add new test for this case.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5020
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3894 from zjffdu/ZEPPELIN-5020 and squashes the following commits:
e9551bfee [Jeff Zhang] [ZEPPELIN-5020]. Unable create temporary view in flink
(cherry picked from commit e51f3ebf7c491663247ed31a2bf84861e1174bbf)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java | 10 +++++++---
.../java/org/apache/zeppelin/flink/SqlInterpreterTest.java | 11 +++++++++++
2 files changed, 18 insertions(+), 3 deletions(-)
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 81a0bce..2e1fe71 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -264,7 +264,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
callDropTable(cmdCall.operands[0], context);
break;
case CREATE_VIEW:
- callCreateView(cmdCall.operands[0], cmdCall.operands[1], context);
+ callCreateView(cmdCall, context);
break;
case DROP_VIEW:
callDropView(cmdCall.operands[0], context);
@@ -337,10 +337,14 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
context.out.write("View has been dropped.\n");
}
- private void callCreateView(String name, String query, InterpreterContext context) throws IOException {
+ private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
try {
lock.lock();
- this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query));
+ if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+ this.tbenv.createTemporaryView(sqlCommand.operands[0], tbenv.sqlQuery(sqlCommand.operands[1]));
+ } else {
+ flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
+ }
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0b621ce..c978533 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -352,6 +352,17 @@ public abstract class SqlInterpreterTest {
resultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, resultMessages.size());
assertEquals("View has been dropped.\n", resultMessages.get(0).getData());
+
+ // create temporary view
+ if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("create temporary view my_temp_view as select int_col from source_table", context);
+ assertEquals(result.toString(), Code.SUCCESS, result.code());
+ resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(1, resultMessages.size());
+ assertEquals(Type.TEXT, resultMessages.get(0).getType());
+ assertEquals("View has been created.\n", resultMessages.get(0).getData());
+ }
}
@Test