You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/11 10:07:41 UTC

[zeppelin] branch master updated: [ZEPPELIN-4740]. Display streaming data in flink table api

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 80a3856  [ZEPPELIN-4740]. Display streaming data in flink table api
80a3856 is described below

commit 80a3856bc722f57e4603e276e88920633a15248f
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Apr 9 17:20:05 2020 +0800

    [ZEPPELIN-4740]. Display streaming data in flink table api
    
    ### What is this PR for?
    This PR is to allow user to display streaming data in flink table api just like displaying streaming data in stream sql (%flink.ssql). I implement it in both scala and pyflink.
    
    Here's one example in flink scala table api
    
    ```
    val table = stenv.from("cdn_access_log")
       .select("uuid, ip_to_province(client_ip) as province, response_size, request_time")
       .groupBy("province")
       .select( "province, count(uuid) as access_count, sum(response_size) as total_download,  sum(response_size) * 1.0 / sum(request_time) as download_speed")
    z.show(table, streamType="update")
    ```
    
    ### What type of PR is it?
    [Feature ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4740
    
    ### How should this be tested?
    Unit test is added and also verify it manually.
    
    ### Screenshots (if appropriate)
    ![image](https://user-images.githubusercontent.com/164491/78974396-75a59580-7b44-11ea-91be-e962d630e739.png)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3729 from zjffdu/ZEPPELIN-4740 and squashes the following commits:
    
    bcd25978e [Jeff Zhang] [ZEPPELIN-4740]. Display streaming data in flink table api
---
 flink/pom.xml                                      |   2 +-
 .../apache/zeppelin/flink/FlinkInterpreter.java    |   4 +-
 .../java/org/apache/zeppelin/flink/JobManager.java |   6 ++
 .../zeppelin/flink/sql/AbstractStreamSqlJob.java   |  11 ++-
 .../src/main/resources/python/zeppelin_ipyflink.py |   5 +-
 .../src/main/resources/python/zeppelin_pyflink.py  |   5 +-
 .../zeppelin/flink/FlinkScalaInterpreter.scala     |  20 +++-
 .../zeppelin/flink/FlinkZeppelinContext.scala      |  53 ++++++++--
 .../zeppelin/flink/FlinkInterpreterTest.java       |   2 +-
 .../flink/FlinkStreamSqlInterpreterTest.java       |  59 ++++++++++-
 .../zeppelin/flink/IPyFlinkInterpreterTest.java    | 110 +++++++++++++++++++--
 .../zeppelin/flink/PyFlinkInterpreterTest.java     |  25 ++++-
 .../apache/zeppelin/flink/SqlInterpreterTest.java  |   6 +-
 13 files changed, 265 insertions(+), 43 deletions(-)

diff --git a/flink/pom.xml b/flink/pom.xml
index 58665bb..d74c0d9 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -634,7 +634,7 @@
           <!-- set sun.zip.disableMemoryMapping=true because of
           https://blogs.oracle.com/poonam/crashes-in-zipgetentry
           https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
-          <argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+          <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
 
           <environmentVariables>
             <FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index af4de3d..a9afc1d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -110,7 +110,7 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   StreamTableEnvironment getStreamTableEnvironment() {
-    return this.innerIntp.getStreamTableEnvironment();
+    return this.innerIntp.getStreamTableEnvironment("blink");
   }
 
   org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment(String planner) {
@@ -122,7 +122,7 @@ public class FlinkInterpreter extends Interpreter {
   }
 
   TableEnvironment getBatchTableEnvironment() {
-    return this.innerIntp.getBatchTableEnvironment();
+    return this.innerIntp.getBatchTableEnvironment("blink");
   }
 
   JobManager getJobManager() {
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 527b0f7..8e4e870 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -140,6 +140,12 @@ public class JobManager {
     jobProgressPoller.interrupt();
   }
 
+  public void shutdown() {
+    for (FlinkJobProgressPoller jobProgressPoller : jobProgressPollerMap.values()) {
+      jobProgressPoller.cancel();
+    }
+  }
+
   class FlinkJobProgressPoller extends Thread {
 
     private String flinkWebUI;
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 30c4f45..9469b89 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -96,11 +96,15 @@ public abstract class AbstractStreamSqlJob {
   protected abstract String getType();
 
   public InterpreterResult run(String st) throws IOException {
+    Table table = stenv.sqlQuery(st);
+    String tableName = st + "_" + SQL_INDEX.getAndIncrement();
+    return run(table, tableName);
+  }
+
+  public InterpreterResult run(Table table, String tableName) throws IOException {
     try {
       int parallelism = Integer.parseInt(context.getLocalProperties()
               .getOrDefault("parallelism", defaultParallelism + ""));
-
-      Table table = stenv.sqlQuery(st);
       this.schema = removeTimeAttributes(table.getSchema());
       checkTableSchema(schema);
 
@@ -132,7 +136,6 @@ public abstract class AbstractStreamSqlJob {
       try {
         stenv.useCatalog("default_catalog");
         stenv.useDatabase("default_database");
-        String tableName = st + "_" + SQL_INDEX.getAndIncrement();
         stenv.registerTableSink(tableName, collectTableSink);
         table.insertInto(new StreamQueryConfig(), tableName);
       } finally {
@@ -149,7 +152,7 @@ public abstract class AbstractStreamSqlJob {
       retrievalThread.start();
 
       LOGGER.info("Run job without savePointPath, " + ", parallelism: " + parallelism);
-      stenv.execute(st);
+      stenv.execute(tableName);
       LOGGER.info("Flink Job is finished");
       // wait for retrieve thread consume all data
       LOGGER.info("Waiting for retrieve thread to be done");
diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/src/main/resources/python/zeppelin_ipyflink.py
index 0c1f2e7..fe94c9f 100644
--- a/flink/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/src/main/resources/python/zeppelin_ipyflink.py
@@ -60,7 +60,10 @@ class IPyFlinkZeppelinContext(PyZeppelinContext):
     def show(self, obj, **kwargs):
         from pyflink.table import Table
         if isinstance(obj, Table):
-            print(self.z.showData(obj._j_table))
+            if 'stream_type' in kwargs:
+                self.z.show(obj._j_table, kwargs['stream_type'], kwargs)
+            else:
+                print(self.z.showData(obj._j_table))
         else:
             super(IPyFlinkZeppelinContext, self).show(obj, **kwargs)
 
diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/src/main/resources/python/zeppelin_pyflink.py
index a2bdaa0..8a401b2 100644
--- a/flink/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/src/main/resources/python/zeppelin_pyflink.py
@@ -52,7 +52,10 @@ class PyFlinkZeppelinContext(PyZeppelinContext):
   def show(self, obj, **kwargs):
     from pyflink.table import Table
     if isinstance(obj, Table):
-      print(self.z.showData(obj._j_table))
+      if 'stream_type' in kwargs:
+        self.z.show(obj._j_table, kwargs['stream_type'], kwargs)
+      else:
+        print(self.z.showData(obj._j_table))
     else:
       super(PyFlinkZeppelinContext, self).show(obj, **kwargs)
 
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 32962da..b45637c 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -360,7 +360,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
     flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction")
     flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction")
 
-    this.z = new FlinkZeppelinContext(this.btenv, this.btenv_2, new InterpreterHookRegistry(),
+    this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(),
       Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
     val modifiers = new java.util.ArrayList[String]()
     modifiers.add("@transient")
@@ -638,7 +638,6 @@ class FlinkScalaInterpreter(val properties: Properties) {
             LOGGER.info("Don't close the Remote FlinkCluster")
         }
       }
-
     } else {
       LOGGER.info("Keep cluster alive when closing interpreter")
     }
@@ -647,6 +646,9 @@ class FlinkScalaInterpreter(val properties: Properties) {
       flinkILoop.closeInterpreter()
       flinkILoop = null
     }
+    if (jobManager != null) {
+      jobManager.shutdown()
+    }
   }
 
   private def cleanupStagingDirInternal(appId: ApplicationId): Unit = {
@@ -666,9 +668,19 @@ class FlinkScalaInterpreter(val properties: Properties) {
 
   def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv
 
-  def getBatchTableEnvironment(): TableEnvironment = this.btenv
+  def getBatchTableEnvironment(planner: String = "blink"): TableEnvironment = {
+    if (planner == "blink")
+      this.btenv
+    else
+      this.btenv_2
+  }
 
-  def getStreamTableEnvironment(): StreamTableEnvironment = this.stenv
+  def getStreamTableEnvironment(planner: String = "blink"): StreamTableEnvironment = {
+    if (planner == "blink")
+      this.stenv
+    else
+      this.stenv_2
+  }
 
   def getJavaBatchTableEnvironment(planner: String): TableEnvironment = {
     if (planner == "blink") {
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 4f71622..03be11b 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -18,16 +18,21 @@
 
 package org.apache.zeppelin.flink
 
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.collect.Maps
 import org.apache.flink.api.scala.DataSet
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.table.api.internal.TableImpl
 import org.apache.flink.table.api.{Table, TableEnvironment, TableUtils}
-import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
 import org.apache.flink.types.Row
 import org.apache.flink.util.StringUtils
 import org.apache.zeppelin.annotation.ZeppelinApi
 import org.apache.zeppelin.display.AngularObjectWatcher
 import org.apache.zeppelin.display.ui.OptionInput.ParamOption
+import org.apache.zeppelin.flink.sql.{AppendStreamSqlJob, SingleRowStreamSqlJob, UpdateStreamSqlJob}
 import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterHookRegistry, ResultMessages, ZeppelinContext}
 import org.apache.zeppelin.tabledata.TableDataUtils
 
@@ -37,11 +42,11 @@ import scala.collection.{JavaConversions, Seq}
 /**
   * ZeppelinContext for Flink
   */
-class FlinkZeppelinContext(val btenv: TableEnvironment,
-                           val btenv_2: TableEnvironment,
+class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
                            val hooks2: InterpreterHookRegistry,
                            val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) {
 
+  private val SQL_INDEX = new AtomicInteger(0)
   private var currentSql: String = _
 
   private val interpreterClassMap = Map(
@@ -98,10 +103,10 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
   override def showData(obj: Any, maxResult: Int): String = {
     if (obj.isInstanceOf[DataSet[_]]) {
       val ds = obj.asInstanceOf[DataSet[_]]
-      val env = btenv_2.asInstanceOf[BatchTableEnvironment]
-      val table = env.fromDataSet(ds)
+      val btenv = flinkInterpreter.getBatchTableEnvironment("flink").asInstanceOf[BatchTableEnvironment]
+      val table = btenv.fromDataSet(ds)
       val columnNames: Array[String] = table.getSchema.getFieldNames
-      val dsRows: DataSet[Row] = env.toDataSet[Row](table)
+      val dsRows: DataSet[Row] = btenv.toDataSet[Row](table)
       showTable(columnNames, dsRows.first(maxResult + 1).collect())
     } else if (obj.isInstanceOf[Table]) {
       val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(obj.asInstanceOf[TableImpl])).toSeq
@@ -114,7 +119,8 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
 
   def showFlinkTable(table: Table): String = {
     val columnNames: Array[String] = table.getSchema.getFieldNames
-    val dsRows: DataSet[Row] = btenv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+    val dsRows: DataSet[Row] = flinkInterpreter.getJavaBatchTableEnvironment("flink")
+      .asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
     showTable(columnNames, dsRows.first(maxResult + 1).collect())
   }
 
@@ -124,6 +130,39 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
     showTable(columnNames, rows)
   }
 
+  def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
+    val stenv = flinkInterpreter.getStreamTableEnvironment()
+    val context = InterpreterContext.get()
+    configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
+    val tableName = context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
+    if (streamType.equalsIgnoreCase("single")) {
+      val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+      streamJob.run(table, tableName)
+    }
+    else if (streamType.equalsIgnoreCase("append")) {
+      val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+      streamJob.run(table, tableName)
+    }
+    else if (streamType.equalsIgnoreCase("update")) {
+      val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+        stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+      streamJob.run(table, tableName)
+    }
+    else throw new IOException("Unrecognized stream type: " + streamType)
+  }
+
+  /**
+   * Called by python
+   * @param table
+   * @param streamType
+   * @param configs
+   */
+  def show(table: Table, streamType: String, configs: java.util.Map[String, String]): Unit = {
+    show(table, streamType, JavaConversions.mapAsScalaMap(configs).toMap)
+  }
+
   @ZeppelinApi
   def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
 
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 9af387d..6a20aca 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -216,7 +216,7 @@ public class FlinkInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     context = getInterpreterContext();
     result = interpreter.interpret("z.show(data)", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
     List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
     assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
     assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index c01dbff..d920e7e 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -19,10 +19,6 @@ package org.apache.zeppelin.flink;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -65,6 +61,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
   }
 
   @Test
+  public void testSingleStreamTableApi() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = getInterpreterContext();
+    String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\nz.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
+    result = flinkInterpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("Total Count"));
+  }
+
+  @Test
   public void testUpdateStreamSql() throws IOException, InterpreterException {
     String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -83,6 +96,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
   }
 
   @Test
+  public void testUpdateStreamTableApi() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    String code = "val table = stenv.sqlQuery(\"select url, count(1) as pv from log group by url\")\nz.show(table, streamType=\"update\")";
+    result = flinkInterpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+  }
+
+  @Test
   public void testAppendStreamSql() throws IOException, InterpreterException {
     String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -102,6 +132,25 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
   }
 
   @Test
+  public void testAppendStreamTableApi() throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+    InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+            getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    String code = "val table = stenv.sqlQuery(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " +
+            "start_time, url, count(1) as pv from log group by " +
+            "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table, streamType=\"append\")";
+    result = flinkInterpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+  }
+
+  @Test
   public void testStreamUDF() throws IOException, InterpreterException {
     String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
     InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -118,7 +167,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     context.getLocalProperties().put("type", "update");
     result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
             "log group by url", context);
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
 //    assertEquals(InterpreterResult.Type.TABLE,
 //            updatedOutput.toInterpreterResultMessage().getType());
 //    assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index eb678a2..1a604d3 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.flink;
 
 
 import com.google.common.io.Files;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -26,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
 import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.python.IPythonInterpreterTest;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
 import static junit.framework.TestCase.assertTrue;
@@ -95,7 +98,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
   }
 
   @Test
-  public void testBatchIPyFlink() throws InterpreterException {
+  public void testBatchIPyFlink() throws InterpreterException, IOException {
     testBatchPyFlink(interpreter, flinkScalaInterpreter);
   }
 
@@ -104,8 +107,23 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     testStreamPyFlink(interpreter, flinkScalaInterpreter);
   }
 
-  public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
-    InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+  @Test
+  public void testSingleStreamTableApi() throws InterpreterException, IOException {
+    testSingleStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
+  @Test
+  public void testUpdateStreamTableApi() throws InterpreterException, IOException {
+    testUpdateStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
+  @Test
+  public void testAppendStreamTableApi() throws InterpreterException, IOException {
+    testAppendStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
+  public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
+    InterpreterContext context = createInterpreterContext();
     InterpreterResult result = pyflinkInterpreter.interpret(
         "import tempfile\n" +
         "import os\n" +
@@ -135,7 +153,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     // use group by
-    context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    context = createInterpreterContext();
     result = pyflinkInterpreter.interpret(
             "import tempfile\n" +
             "import os\n" +
@@ -176,7 +194,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
-    context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    context = createInterpreterContext();
     result = pyflinkInterpreter.interpret(
             "import tempfile\n" +
             "import os\n" +
@@ -204,6 +222,28 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
             "bt_env.execute(\"batch_job3\")"
             , context);
     assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+
+    // z.show
+    context = createInterpreterContext();
+    result = pyflinkInterpreter.interpret(
+            "import tempfile\n" +
+            "import os\n" +
+            "import shutil\n" +
+            "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+            "if os.path.exists(sink_path):\n" +
+            "    if os.path.isfile(sink_path):\n" +
+            "      os.remove(sink_path)\n" +
+            "    else:\n" +
+            "      shutil.rmtree(sink_path)\n" +
+            "b_env.set_parallelism(1)\n" +
+            "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+            "z.show(t)"
+            , context);
+    assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(new String(context.out.toByteArray()), 1, resultMessages.size());
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertEquals(new String(context.out.toByteArray()), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
   }
 
   @Override
@@ -223,7 +263,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
   }
 
   public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
-    InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+    InterpreterContext context = createInterpreterContext();
     InterpreterResult result = interpreter.interpret(
             "import tempfile\n" +
             "import os\n" +
@@ -253,13 +293,65 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
   }
 
-  private static InterpreterContext createInterpreterContext(
-          RemoteInterpreterEventClient mockRemoteEventClient) {
+  public static void testSingleStreamTableApi(Interpreter interpreter,
+                                              Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+    InterpreterContext context = createInterpreterContext();
+    InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = createInterpreterContext();
+    String code = "table = st_env.sql_query('select max(rowtime), count(1) from log')\nz.show(table,stream_type='single',template = 'Total Count: {1} <br/> {0}')";
+    result = interpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("Total Count"));
+  }
+
+  public static void testUpdateStreamTableApi(Interpreter interpreter,
+                                              Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+    InterpreterContext context = createInterpreterContext();
+    InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = createInterpreterContext();
+    String code = "table = st_env.sql_query('select url, count(1) as pv from log group by url')\nz.show(table,stream_type='update')";
+    result = interpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+  }
+
+  public static void testAppendStreamTableApi(Interpreter interpreter,
+                                              Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+    String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+    InterpreterContext context = createInterpreterContext();
+    InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    context = createInterpreterContext();
+    String code = "table = st_env.sql_query(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " +
+            "start_time, url, count(1) as pv from log group by " +
+            "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table,stream_type='append')";
+    result = interpreter.interpret(code, context);
+    assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+    List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+    assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+    assertTrue(resultMessages.toString(),
+            resultMessages.get(0).getData().contains("url\tpv\n"));
+  }
+
+  private static InterpreterContext createInterpreterContext() {
     return InterpreterContext.builder()
         .setNoteId("noteId")
         .setParagraphId("paragraphId")
-        .setIntpEventClient(mockRemoteEventClient)
         .setInterpreterOut(new InterpreterOutput(null))
+        .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
         .build();
   }
 
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index 7bbc1dd..d6a24a2 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -31,16 +31,12 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.apache.zeppelin.python.PythonInterpreterTest;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.util.LinkedList;
 import java.util.Properties;
 
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 
 
@@ -109,11 +105,30 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
   }
 
   @Test
-  public void testPyFlink() throws InterpreterException, IOException {
+  public void testBatchPyFlink() throws InterpreterException, IOException {
     IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+  }
+
+  @Test
+  public void testStreamIPyFlink() throws InterpreterException, IOException {
     IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
   }
 
+  @Test
+  public void testSingleStreamTableApi() throws InterpreterException, IOException {
+    IPyFlinkInterpreterTest.testSingleStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
+  @Test
+  public void testUpdateStreamTableApi() throws InterpreterException, IOException {
+    IPyFlinkInterpreterTest.testUpdateStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
+  @Test
+  public void testAppendStreamTableApi() throws InterpreterException, IOException {
+    IPyFlinkInterpreterTest.testAppendStreamTableApi(interpreter, flinkScalaInterpreter);
+  }
+
   protected InterpreterContext getInterpreterContext() {
     appendOutput = "";
     InterpreterContext context = InterpreterContext.builder()
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0dbcdc3..34dc05c 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -42,10 +42,8 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterGroup;
 import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
 import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
 import org.junit.After;
 import org.junit.Before;
@@ -62,7 +60,8 @@ import java.io.PrintWriter;
 import java.util.List;
 import java.util.Properties;
 
-import static org.apache.zeppelin.interpreter.InterpreterResult.*;
+import static org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import static org.apache.zeppelin.interpreter.InterpreterResult.Type;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
@@ -358,6 +357,7 @@ public abstract class SqlInterpreterTest {
 
   protected InterpreterContext getInterpreterContext() {
     return InterpreterContext.builder()
+            .setParagraphId("paragraphId")
             .setInterpreterOut(new InterpreterOutput(null))
             .setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
             .setIntpEventClient(mock(RemoteInterpreterEventClient.class))