You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/11 13:56:36 UTC

[zeppelin] branch master updated: [ZEPPELIN-4750]. Don't display temporary tables in show tables of flink interpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 07a7f93  [ZEPPELIN-4750]. Don't display temporary tables in show tables of flink interpreter
07a7f93 is described below

commit 07a7f93a5e4cc41a5d5ed915718131e402eb87e3
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Apr 10 22:42:27 2020 +0800

    [ZEPPELIN-4750]. Don't display temporary tables in show tables of flink interpreter
    
    ### What is this PR for?
    
    This PR would filter the temporary tables in show tables, only show the permanent tables. Otherwise it would confuse users if they see lots of temporary tables created by zeppelin.
    
    ### What type of PR is it?
    [ Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4750
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3731 from zjffdu/ZEPPELIN-4750 and squashes the following commits:
    
    85fe8b14b [Jeff Zhang] [ZEPPELIN-4750]. Don't display temporary tables in show tables of flink interpreter
---
 .../org/apache/zeppelin/flink/FlinkSqlInterrpeter.java |  6 +++++-
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java       |  2 +-
 .../apache/zeppelin/flink/FlinkZeppelinContext.scala   |  2 +-
 .../zeppelin/flink/FlinkBatchSqlInterpreterTest.java   | 12 ++++++++++++
 .../zeppelin/flink/FlinkStreamSqlInterpreterTest.java  | 18 ++++++++++++++++++
 .../org/apache/zeppelin/flink/SqlInterpreterTest.java  |  8 ++++++++
 6 files changed, 45 insertions(+), 3 deletions(-)

diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
index c85dbf4..2a5c2d9 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkSqlInterrpeter.java
@@ -18,6 +18,7 @@
 
 package org.apache.zeppelin.flink;
 
+import com.google.common.collect.Lists;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -55,6 +56,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 public abstract class FlinkSqlInterrpeter extends Interpreter {
 
@@ -416,7 +418,9 @@ public abstract class FlinkSqlInterrpeter extends Interpreter {
   }
 
   private void callShowTables(InterpreterContext context) throws IOException {
-    String[] tables = this.tbenv.listTables();
+    List<String> tables =
+            Lists.newArrayList(this.tbenv.listTables()).stream()
+                    .filter(tbl -> !tbl.startsWith("UnnamedTable")).collect(Collectors.toList());
     context.out.write(
             "%table table\n" + StringUtils.join(tables, "\n") + "\n");
   }
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 9469b89..f293fad 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -97,7 +97,7 @@ public abstract class AbstractStreamSqlJob {
 
   public InterpreterResult run(String st) throws IOException {
     Table table = stenv.sqlQuery(st);
-    String tableName = st + "_" + SQL_INDEX.getAndIncrement();
+    String tableName = "UnnamedTable_" + st + "_" + SQL_INDEX.getAndIncrement();
     return run(table, tableName);
   }
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 03be11b..3694f9a 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -134,7 +134,7 @@ class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
     val stenv = flinkInterpreter.getStreamTableEnvironment()
     val context = InterpreterContext.get()
     configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
-    val tableName = context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
+    val tableName = "UnnamedTable_" + context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
     if (streamType.equalsIgnoreCase("single")) {
       val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
         stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
index fb51d57..3a34273 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkBatchSqlInterpreterTest.java
@@ -135,6 +135,18 @@ public class FlinkBatchSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(1, resultMessages.size());
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("name\nA\nB\n", resultMessages.get(0).getData());
+
+    // after these select queries, `show tables` should still show only one source table,
+    // other temporary tables should not be displayed.
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show tables", context);
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals(resultMessages.get(0).toString(),
+            "table\nsource_table\n", resultMessages.get(0).getData());
+
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index d920e7e..943906f 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -75,6 +75,13 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
     assertTrue(resultMessages.toString(),
             resultMessages.get(0).getData().contains("Total Count"));
+
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show tables", context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("table\nlog\n", resultMessages.get(0).getData());
   }
 
   @Test
@@ -198,6 +205,17 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
             getInterpreterContext());
 
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    // after these select queries, `show tables` should still show only one source table,
+    // other temporary tables should not be displayed.
+    InterpreterContext context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show tables", context);
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(1, resultMessages.size());
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals(resultMessages.get(0).toString(),
+            "table\ndest_table\nsource_table\n", resultMessages.get(0).getData());
   }
 
   @Test
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 34dc05c..3ee48e5 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -331,6 +331,14 @@ public abstract class SqlInterpreterTest {
     assertEquals(Type.TEXT, resultMessages.get(0).getType());
     assertTrue(resultMessages.get(0).getData(), resultMessages.get(0).getData().contains("already exists"));
 
+    // show tables
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("show tables", context);
+    assertEquals(Code.SUCCESS, result.code());
+    resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(Type.TABLE, resultMessages.get(0).getType());
+    assertEquals("table\nmy_view\nsource_table\n", resultMessages.get(0).getData());
+
     // drop table
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("drop view my_view", context);