You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/08/31 04:14:36 UTC

[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5020]. Unable create temporary view in flink

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new db1a95b  [ZEPPELIN-5020]. Unable create temporary view in flink
db1a95b is described below

commit db1a95bb42e9ab34e5b112da1e37f0d31b716fe0
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Aug 27 19:28:26 2020 +0800

    [ZEPPELIN-5020]. Unable create temporary view in flink
    
    ### What is this PR for?
    
    This is due to api change in flink 1.11, this PR fix this issue and also add new test for this case.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5020
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3894 from zjffdu/ZEPPELIN-5020 and squashes the following commits:
    
    e9551bfee [Jeff Zhang] [ZEPPELIN-5020]. Unable create temporary view in flink
    
    (cherry picked from commit e51f3ebf7c491663247ed31a2bf84861e1174bbf)
    Signed-off-by: Jeff Zhang <zj...@apache.org>
---
 .../java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java   | 10 +++++++---
 .../java/org/apache/zeppelin/flink/SqlInterpreterTest.java    | 11 +++++++++++
 2 files changed, 18 insertions(+), 3 deletions(-)

diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 81a0bce..2e1fe71 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -264,7 +264,7 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
         callDropTable(cmdCall.operands[0], context);
         break;
       case CREATE_VIEW:
-        callCreateView(cmdCall.operands[0], cmdCall.operands[1], context);
+        callCreateView(cmdCall, context);
         break;
       case DROP_VIEW:
         callDropView(cmdCall.operands[0], context);
@@ -337,10 +337,14 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
     context.out.write("View has been dropped.\n");
   }
 
-  private void callCreateView(String name, String query, InterpreterContext context) throws IOException {
+  private void callCreateView(SqlCommandParser.SqlCommandCall sqlCommand, InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      this.tbenv.createTemporaryView(name, tbenv.sqlQuery(query));
+      if (flinkInterpreter.getFlinkVersion().isFlink110()) {
+        this.tbenv.createTemporaryView(sqlCommand.operands[0], tbenv.sqlQuery(sqlCommand.operands[1]));
+      } else {
+        flinkInterpreter.getFlinkShims().executeSql(tbenv, sqlCommand.sql);
+      }
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0b621ce..c978533 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -352,6 +352,17 @@ public abstract class SqlInterpreterTest {
     resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(1, resultMessages.size());
     assertEquals("View has been dropped.\n", resultMessages.get(0).getData());
+
+    // create temporary view
+    if (!flinkInterpreter.getFlinkVersion().isFlink110()) {
+      context = getInterpreterContext();
+      result = sqlInterpreter.interpret("create temporary view my_temp_view as select int_col from source_table", context);
+      assertEquals(result.toString(), Code.SUCCESS, result.code());
+      resultMessages = context.out.toInterpreterResultMessage();
+      assertEquals(1, resultMessages.size());
+      assertEquals(Type.TEXT, resultMessages.get(0).getType());
+      assertEquals("View has been created.\n", resultMessages.get(0).getData());
+    }
   }
 
   @Test