You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/10/12 02:01:55 UTC

[zeppelin] branch master updated: [ZEPPELIN-5088]. Explain statement is broken in flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new d4c18c1  [ZEPPELIN-5088]. Explain statement is broken in flink interpreter
d4c18c1 is described below

commit d4c18c1c46897b14828487a050db887bbcc36a0a
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Oct 7 16:02:22 2020 +0800

    [ZEPPELIN-5088]. Explain statement is broken in flink interpreter
    
    ### What is this PR for?
    
    This PR fix the explain statement issue in flink interpreter. For flink 1.10, 1.11, explain works in different approach, so we need to use shim to do that. This PR also add unit test to prevent regression in future.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5088
    
    ### How should this be tested?
    * CI pass
    https://travis-ci.org/github/zjffdu/zeppelin/builds/734175742
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3936 from zjffdu/ZEPPELIN-5088 and squashes the following commits:
    
    83ac341be [Jeff Zhang] [ZEPPELIN-5088]. Explain statement is broken in flink interpreter
---
 .../java/org/apache/zeppelin/flink/FlinkShims.java |  2 ++
 .../zeppelin/flink/sql/SqlCommandParser.java       |  6 +++--
 .../org/apache/zeppelin/flink/Flink110Shims.java   | 26 ++++++++++++++----
 .../CollectStreamTableSink.java                    |  2 +-
 .../Flink110ScalaShims.scala                       |  2 +-
 .../org/apache/zeppelin/flink/Flink111Shims.java   |  7 +++++
 .../apache/zeppelin/flink/FlinkSqlInterrpeter.java |  3 +--
 .../apache/zeppelin/flink/SqlInterpreterTest.java  | 31 ++++++++++++++++++++++
 8 files changed, 68 insertions(+), 11 deletions(-)

diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
index 55c3b00..f7a7514 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/FlinkShims.java
@@ -126,6 +126,8 @@ public abstract class FlinkShims {
 
   public abstract void executeSql(Object tableEnv, String sql);
 
+  public abstract String explain(Object tableEnv, String sql);
+
   public abstract String sqlHelp();
 
   public abstract void setCatalogManagerSchemaResolver(Object catalogManager,
diff --git a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
index d28d7f6..1a4f0e3 100644
--- a/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
+++ b/flink/flink-shims/src/main/java/org/apache/zeppelin/flink/sql/SqlCommandParser.java
@@ -112,8 +112,10 @@ public final class SqlCommandParser {
             SINGLE_OPERAND),
 
     EXPLAIN(
-            "EXPLAIN\\s+(.*)",
-            SINGLE_OPERAND),
+            "EXPLAIN\\s+(SELECT|INSERT)\\s+(.*)",
+            (operands) -> {
+              return Optional.of(new String[] { operands[0], operands[1] });
+            }),
 
     CREATE_DATABASE(
             "(CREATE\\s+DATABASE\\s+.*)",
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
index fbb1379..e917b3c 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/Flink110Shims.java
@@ -40,8 +40,8 @@ import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.sinks.TableSink;
 import org.apache.flink.types.Row;
-import org.apache.zeppelin.flink.shims111.CollectStreamTableSink;
-import org.apache.zeppelin.flink.shims111.Flink110ScalaShims;
+import org.apache.zeppelin.flink.shims110.CollectStreamTableSink;
+import org.apache.zeppelin.flink.shims110.Flink110ScalaShims;
 import org.apache.zeppelin.flink.sql.SqlCommandParser;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.jline.utils.AttributedString;
@@ -200,9 +200,19 @@ public class Flink110Shims extends FlinkShims {
         for (int i = 0; i < groups.length; i++) {
           groups[i] = matcher.group(i + 1);
         }
-        final String sql = stmt;
-        return cmd.operandConverter.apply(groups)
-                .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, operands, sql));
+        if (cmd == SqlCommandParser.SqlCommand.EXPLAIN) {
+          String[] operands = cmd.operandConverter.apply(groups).get();
+          if (operands[0].equalsIgnoreCase("select")) {
+            // flink 1.10 only suppports explain select statement.
+            String[] newOperands = new String[]{operands[0] + " " + operands[1]};
+            return Optional.of(new SqlCommandParser.SqlCommandCall(cmd, newOperands, stmt));
+          } else {
+            return Optional.empty();
+          }
+        } else {
+          return cmd.operandConverter.apply(groups)
+                  .map((operands) -> new SqlCommandParser.SqlCommandCall(cmd, operands, stmt));
+        }
       }
     }
     return Optional.empty();
@@ -214,6 +224,12 @@ public class Flink110Shims extends FlinkShims {
   }
 
   @Override
+  public String explain(Object tableEnv, String sql) {
+    Table table = ((TableEnvironment) tableEnv).sqlQuery(sql);
+    return ((TableEnvironment) tableEnv).explain(table);
+  }
+
+  @Override
   public String sqlHelp() {
     return MESSAGE_HELP.toString();
   }
diff --git a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
similarity index 98%
rename from flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
rename to flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
index b7a0ea7..925e3a7 100644
--- a/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims111/CollectStreamTableSink.java
+++ b/flink/flink1.10-shims/src/main/java/org/apache/zeppelin/flink/shims110/CollectStreamTableSink.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.flink.shims111;
+package org.apache.zeppelin.flink.shims110;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeinfo.Types;
diff --git a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
similarity index 97%
rename from flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala
rename to flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
index cfd8894..b9978f5 100644
--- a/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims111/Flink110ScalaShims.scala
+++ b/flink/flink1.10-shims/src/main/scala/org/apache/zeppelin/flink/shims110/Flink110ScalaShims.scala
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.zeppelin.flink.shims111
+package org.apache.zeppelin.flink.shims110
 
 import org.apache.flink.api.scala.{DataSet, FlinkILoop}
 import org.apache.flink.table.api.Table
diff --git a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
index 2b9185b..8fad5d4 100644
--- a/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
+++ b/flink/flink1.11-shims/src/main/java/org/apache/zeppelin/flink/Flink111Shims.java
@@ -33,6 +33,7 @@ import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.StatementSet;
 import org.apache.flink.table.api.Table;
 import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
 import org.apache.flink.table.api.bridge.scala.BatchTableEnvironment;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
@@ -374,6 +375,12 @@ public class Flink111Shims extends FlinkShims {
   }
 
   @Override
+  public String explain(Object tableEnv, String sql) {
+    TableResult tableResult = ((TableEnvironment) tableEnv).executeSql(sql);
+    return tableResult.collect().next().getField(0).toString();
+  }
+
+  @Override
   public String sqlHelp() {
     return MESSAGE_HELP.toString();
   }
diff --git a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index 02e3597..d85e170 100644
--- a/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/interpreter/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -469,8 +469,7 @@ public abstract class FlinkSqlInterrpeter extends AbstractInterpreter {
   private void callExplain(String sql, InterpreterContext context) throws IOException {
     try {
       lock.lock();
-      Table table = this.tbenv.sqlQuery(sql);
-      context.out.write(this.tbenv.explain(table) + "\n");
+      context.out.write(this.flinkInterpreter.getFlinkShims().explain(tbenv, sql) + "\n");
     } finally {
       if (lock.isHeldByCurrentThread()) {
         lock.unlock();
diff --git a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index c978533..2bd9ee8 100644
--- a/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/interpreter/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -366,6 +366,37 @@ public abstract class SqlInterpreterTest {
   }
 
   @Test
+  public void testExplain() throws InterpreterException, IOException {
+    // create table
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE TABLE source_table (int_col INT, double_col double, " +
+                    "varchar_col varchar, bool_col boolean)" +
+                    " WITH (\n" +
+                    "'format.field-delimiter'='\\n',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='hdfs:///tmp/bank.csv',\n" +
+                    "'format.type'='csv'\n" +
+                    ");",
+            context);
+    assertEquals(Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(Type.TEXT, resultMessages.get(0).getType());
+    assertEquals("Table has been created.\n", resultMessages.get(0).getData());
+
+    // explain select statement.
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("explain select * from source_table", context);
+    assertEquals(Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(1, resultMessages.size());
+    assertEquals(Type.TEXT, resultMessages.get(0).getType());
+    assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("Physical Execution Plan"));
+  }
+
+  @Test
   public void testInvalidSql() throws InterpreterException, IOException {
 
     InterpreterContext context = getInterpreterContext();