You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/04/11 10:08:28 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-4740]. Display
streaming data in flink table api
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 9a3947c [ZEPPELIN-4740]. Display streaming data in flink table api
9a3947c is described below
commit 9a3947cdd07396000b50081074f2c72e44e88a7a
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Apr 9 17:20:05 2020 +0800
[ZEPPELIN-4740]. Display streaming data in flink table api
### What is this PR for?
This PR is to allow user to display streaming data in flink table api just like displaying streaming data in stream sql (%flink.ssql). I implement it in both scala and pyflink.
Here's one example in flink scala table api
```
val table = stenv.from("cdn_access_log")
.select("uuid, ip_to_province(client_ip) as province, response_size, request_time")
.groupBy("province")
.select( "province, count(uuid) as access_count, sum(response_size) as total_download, sum(response_size) * 1.0 / sum(request_time) as download_speed")
z.show(table, streamType="update")
```
### What type of PR is it?
[Feature ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4740
### How should this be tested?
Unit test is added and also verify it manually.
### Screenshots (if appropriate)
![image](https://user-images.githubusercontent.com/164491/78974396-75a59580-7b44-11ea-91be-e962d630e739.png)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3729 from zjffdu/ZEPPELIN-4740 and squashes the following commits:
bcd25978e [Jeff Zhang] [ZEPPELIN-4740]. Display streaming data in flink table api
(cherry picked from commit 80a3856bc722f57e4603e276e88920633a15248f)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
flink/pom.xml | 2 +-
.../apache/zeppelin/flink/FlinkInterpreter.java | 4 +-
.../java/org/apache/zeppelin/flink/JobManager.java | 6 ++
.../zeppelin/flink/sql/AbstractStreamSqlJob.java | 11 ++-
.../src/main/resources/python/zeppelin_ipyflink.py | 5 +-
.../src/main/resources/python/zeppelin_pyflink.py | 5 +-
.../zeppelin/flink/FlinkScalaInterpreter.scala | 20 +++-
.../zeppelin/flink/FlinkZeppelinContext.scala | 53 ++++++++--
.../zeppelin/flink/FlinkInterpreterTest.java | 2 +-
.../flink/FlinkStreamSqlInterpreterTest.java | 59 ++++++++++-
.../zeppelin/flink/IPyFlinkInterpreterTest.java | 110 +++++++++++++++++++--
.../zeppelin/flink/PyFlinkInterpreterTest.java | 25 ++++-
.../apache/zeppelin/flink/SqlInterpreterTest.java | 6 +-
13 files changed, 265 insertions(+), 43 deletions(-)
diff --git a/flink/pom.xml b/flink/pom.xml
index 58665bb..d74c0d9 100644
--- a/flink/pom.xml
+++ b/flink/pom.xml
@@ -634,7 +634,7 @@
<!-- set sun.zip.disableMemoryMapping=true because of
https://blogs.oracle.com/poonam/crashes-in-zipgetentry
https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8191484 -->
- <argLine>-Xmx3072m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
+ <argLine>-Xmx4096m -XX:MaxMetaspaceSize=512m -Dsun.zip.disableMemoryMapping=true</argLine>
<environmentVariables>
<FLINK_HOME>${project.build.directory}/flink-${flink.version}</FLINK_HOME>
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
index af4de3d..a9afc1d 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/FlinkInterpreter.java
@@ -110,7 +110,7 @@ public class FlinkInterpreter extends Interpreter {
}
StreamTableEnvironment getStreamTableEnvironment() {
- return this.innerIntp.getStreamTableEnvironment();
+ return this.innerIntp.getStreamTableEnvironment("blink");
}
org.apache.flink.table.api.TableEnvironment getJavaBatchTableEnvironment(String planner) {
@@ -122,7 +122,7 @@ public class FlinkInterpreter extends Interpreter {
}
TableEnvironment getBatchTableEnvironment() {
- return this.innerIntp.getBatchTableEnvironment();
+ return this.innerIntp.getBatchTableEnvironment("blink");
}
JobManager getJobManager() {
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
index 0aab84f..480f6bb 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/JobManager.java
@@ -140,6 +140,12 @@ public class JobManager {
jobProgressPoller.interrupt();
}
+ public void shutdown() {
+ for (FlinkJobProgressPoller jobProgressPoller : jobProgressPollerMap.values()) {
+ jobProgressPoller.cancel();
+ }
+ }
+
class FlinkJobProgressPoller extends Thread {
private String flinkWebUI;
diff --git a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
index 30c4f45..9469b89 100644
--- a/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
+++ b/flink/src/main/java/org/apache/zeppelin/flink/sql/AbstractStreamSqlJob.java
@@ -96,11 +96,15 @@ public abstract class AbstractStreamSqlJob {
protected abstract String getType();
public InterpreterResult run(String st) throws IOException {
+ Table table = stenv.sqlQuery(st);
+ String tableName = st + "_" + SQL_INDEX.getAndIncrement();
+ return run(table, tableName);
+ }
+
+ public InterpreterResult run(Table table, String tableName) throws IOException {
try {
int parallelism = Integer.parseInt(context.getLocalProperties()
.getOrDefault("parallelism", defaultParallelism + ""));
-
- Table table = stenv.sqlQuery(st);
this.schema = removeTimeAttributes(table.getSchema());
checkTableSchema(schema);
@@ -132,7 +136,6 @@ public abstract class AbstractStreamSqlJob {
try {
stenv.useCatalog("default_catalog");
stenv.useDatabase("default_database");
- String tableName = st + "_" + SQL_INDEX.getAndIncrement();
stenv.registerTableSink(tableName, collectTableSink);
table.insertInto(new StreamQueryConfig(), tableName);
} finally {
@@ -149,7 +152,7 @@ public abstract class AbstractStreamSqlJob {
retrievalThread.start();
LOGGER.info("Run job without savePointPath, " + ", parallelism: " + parallelism);
- stenv.execute(st);
+ stenv.execute(tableName);
LOGGER.info("Flink Job is finished");
// wait for retrieve thread consume all data
LOGGER.info("Waiting for retrieve thread to be done");
diff --git a/flink/src/main/resources/python/zeppelin_ipyflink.py b/flink/src/main/resources/python/zeppelin_ipyflink.py
index 0c1f2e7..fe94c9f 100644
--- a/flink/src/main/resources/python/zeppelin_ipyflink.py
+++ b/flink/src/main/resources/python/zeppelin_ipyflink.py
@@ -60,7 +60,10 @@ class IPyFlinkZeppelinContext(PyZeppelinContext):
def show(self, obj, **kwargs):
from pyflink.table import Table
if isinstance(obj, Table):
- print(self.z.showData(obj._j_table))
+ if 'stream_type' in kwargs:
+ self.z.show(obj._j_table, kwargs['stream_type'], kwargs)
+ else:
+ print(self.z.showData(obj._j_table))
else:
super(IPyFlinkZeppelinContext, self).show(obj, **kwargs)
diff --git a/flink/src/main/resources/python/zeppelin_pyflink.py b/flink/src/main/resources/python/zeppelin_pyflink.py
index a2bdaa0..8a401b2 100644
--- a/flink/src/main/resources/python/zeppelin_pyflink.py
+++ b/flink/src/main/resources/python/zeppelin_pyflink.py
@@ -52,7 +52,10 @@ class PyFlinkZeppelinContext(PyZeppelinContext):
def show(self, obj, **kwargs):
from pyflink.table import Table
if isinstance(obj, Table):
- print(self.z.showData(obj._j_table))
+ if 'stream_type' in kwargs:
+ self.z.show(obj._j_table, kwargs['stream_type'], kwargs)
+ else:
+ print(self.z.showData(obj._j_table))
else:
super(PyFlinkZeppelinContext, self).show(obj, **kwargs)
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
index 32962da..b45637c 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkScalaInterpreter.scala
@@ -360,7 +360,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
flinkILoop.interpret("import org.apache.flink.table.functions.AggregateFunction")
flinkILoop.interpret("import org.apache.flink.table.functions.TableFunction")
- this.z = new FlinkZeppelinContext(this.btenv, this.btenv_2, new InterpreterHookRegistry(),
+ this.z = new FlinkZeppelinContext(this, new InterpreterHookRegistry(),
Integer.parseInt(properties.getProperty("zeppelin.flink.maxResult", "1000")))
val modifiers = new java.util.ArrayList[String]()
modifiers.add("@transient")
@@ -638,7 +638,6 @@ class FlinkScalaInterpreter(val properties: Properties) {
LOGGER.info("Don't close the Remote FlinkCluster")
}
}
-
} else {
LOGGER.info("Keep cluster alive when closing interpreter")
}
@@ -647,6 +646,9 @@ class FlinkScalaInterpreter(val properties: Properties) {
flinkILoop.closeInterpreter()
flinkILoop = null
}
+ if (jobManager != null) {
+ jobManager.shutdown()
+ }
}
private def cleanupStagingDirInternal(appId: ApplicationId): Unit = {
@@ -666,9 +668,19 @@ class FlinkScalaInterpreter(val properties: Properties) {
def getStreamExecutionEnvironment(): StreamExecutionEnvironment = this.senv
- def getBatchTableEnvironment(): TableEnvironment = this.btenv
+ def getBatchTableEnvironment(planner: String = "blink"): TableEnvironment = {
+ if (planner == "blink")
+ this.btenv
+ else
+ this.btenv_2
+ }
- def getStreamTableEnvironment(): StreamTableEnvironment = this.stenv
+ def getStreamTableEnvironment(planner: String = "blink"): StreamTableEnvironment = {
+ if (planner == "blink")
+ this.stenv
+ else
+ this.stenv_2
+ }
def getJavaBatchTableEnvironment(planner: String): TableEnvironment = {
if (planner == "blink") {
diff --git a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
index 4f71622..03be11b 100644
--- a/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
+++ b/flink/src/main/scala/org/apache/zeppelin/flink/FlinkZeppelinContext.scala
@@ -18,16 +18,21 @@
package org.apache.zeppelin.flink
+import java.io.IOException
+import java.util.concurrent.atomic.AtomicInteger
+
+import com.google.common.collect.Maps
import org.apache.flink.api.scala.DataSet
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.internal.TableImpl
import org.apache.flink.table.api.{Table, TableEnvironment, TableUtils}
-import org.apache.flink.table.api.scala.BatchTableEnvironment
+import org.apache.flink.table.api.scala.{BatchTableEnvironment, StreamTableEnvironment}
import org.apache.flink.types.Row
import org.apache.flink.util.StringUtils
import org.apache.zeppelin.annotation.ZeppelinApi
import org.apache.zeppelin.display.AngularObjectWatcher
import org.apache.zeppelin.display.ui.OptionInput.ParamOption
+import org.apache.zeppelin.flink.sql.{AppendStreamSqlJob, SingleRowStreamSqlJob, UpdateStreamSqlJob}
import org.apache.zeppelin.interpreter.{InterpreterContext, InterpreterHookRegistry, ResultMessages, ZeppelinContext}
import org.apache.zeppelin.tabledata.TableDataUtils
@@ -37,11 +42,11 @@ import scala.collection.{JavaConversions, Seq}
/**
* ZeppelinContext for Flink
*/
-class FlinkZeppelinContext(val btenv: TableEnvironment,
- val btenv_2: TableEnvironment,
+class FlinkZeppelinContext(val flinkInterpreter: FlinkScalaInterpreter,
val hooks2: InterpreterHookRegistry,
val maxResult2: Int) extends ZeppelinContext(hooks2, maxResult2) {
+ private val SQL_INDEX = new AtomicInteger(0)
private var currentSql: String = _
private val interpreterClassMap = Map(
@@ -98,10 +103,10 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
override def showData(obj: Any, maxResult: Int): String = {
if (obj.isInstanceOf[DataSet[_]]) {
val ds = obj.asInstanceOf[DataSet[_]]
- val env = btenv_2.asInstanceOf[BatchTableEnvironment]
- val table = env.fromDataSet(ds)
+ val btenv = flinkInterpreter.getBatchTableEnvironment("flink").asInstanceOf[BatchTableEnvironment]
+ val table = btenv.fromDataSet(ds)
val columnNames: Array[String] = table.getSchema.getFieldNames
- val dsRows: DataSet[Row] = env.toDataSet[Row](table)
+ val dsRows: DataSet[Row] = btenv.toDataSet[Row](table)
showTable(columnNames, dsRows.first(maxResult + 1).collect())
} else if (obj.isInstanceOf[Table]) {
val rows = JavaConversions.asScalaBuffer(TableUtils.collectToList(obj.asInstanceOf[TableImpl])).toSeq
@@ -114,7 +119,8 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
def showFlinkTable(table: Table): String = {
val columnNames: Array[String] = table.getSchema.getFieldNames
- val dsRows: DataSet[Row] = btenv.asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
+ val dsRows: DataSet[Row] = flinkInterpreter.getJavaBatchTableEnvironment("flink")
+ .asInstanceOf[BatchTableEnvironment].toDataSet[Row](table)
showTable(columnNames, dsRows.first(maxResult + 1).collect())
}
@@ -124,6 +130,39 @@ class FlinkZeppelinContext(val btenv: TableEnvironment,
showTable(columnNames, rows)
}
+ def show(table: Table, streamType: String, configs: Map[String, String] = Map.empty): Unit = {
+ val stenv = flinkInterpreter.getStreamTableEnvironment()
+ val context = InterpreterContext.get()
+ configs.foreach(e => context.getLocalProperties.put(e._1, e._2))
+ val tableName = context.getParagraphId.replace("-", "_") + "_" + SQL_INDEX.getAndIncrement()
+ if (streamType.equalsIgnoreCase("single")) {
+ val streamJob = new SingleRowStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+ streamJob.run(table, tableName)
+ }
+ else if (streamType.equalsIgnoreCase("append")) {
+ val streamJob = new AppendStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+ streamJob.run(table, tableName)
+ }
+ else if (streamType.equalsIgnoreCase("update")) {
+ val streamJob = new UpdateStreamSqlJob(flinkInterpreter.getStreamExecutionEnvironment,
+ stenv, flinkInterpreter.getJobManager, context, flinkInterpreter.getDefaultParallelism)
+ streamJob.run(table, tableName)
+ }
+ else throw new IOException("Unrecognized stream type: " + streamType)
+ }
+
+ /**
+ * Called by python
+ * @param table
+ * @param streamType
+ * @param configs
+ */
+ def show(table: Table, streamType: String, configs: java.util.Map[String, String]): Unit = {
+ show(table, streamType, JavaConversions.mapAsScalaMap(configs).toMap)
+ }
+
@ZeppelinApi
def select(name: String, options: Seq[(Any, String)]): Any = select(name, null, options)
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index 9af387d..6a20aca 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -216,7 +216,7 @@ public class FlinkInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
context = getInterpreterContext();
result = interpreter.interpret("z.show(data)", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
assertEquals("_1\t_2\n1\tjeff\n2\tandy\n3\tjames\n", resultMessages.get(0).getData());
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index c01dbff..d920e7e 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -19,10 +19,6 @@ package org.apache.zeppelin.flink;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.table.api.EnvironmentSettings;
-import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
@@ -65,6 +61,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
}
@Test
+ public void testSingleStreamTableApi() throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+ InterpreterContext context = getInterpreterContext();
+ InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = getInterpreterContext();
+ String code = "val table = stenv.sqlQuery(\"select max(rowtime), count(1) from log\")\nz.show(table,streamType=\"single\", configs = Map(\"template\" -> \"Total Count: {1} <br/> {0}\"))";
+ result = flinkInterpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("Total Count"));
+ }
+
+ @Test
public void testUpdateStreamSql() throws IOException, InterpreterException {
String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -83,6 +96,23 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
}
@Test
+ public void testUpdateStreamTableApi() throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+ InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ InterpreterContext context = getInterpreterContext();
+ String code = "val table = stenv.sqlQuery(\"select url, count(1) as pv from log group by url\")\nz.show(table, streamType=\"update\")";
+ result = flinkInterpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("url\tpv\n"));
+ }
+
+ @Test
public void testAppendStreamSql() throws IOException, InterpreterException {
String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -102,6 +132,25 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
}
@Test
+ public void testAppendStreamTableApi() throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
+ InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
+ getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ InterpreterContext context = getInterpreterContext();
+ String code = "val table = stenv.sqlQuery(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " +
+ "start_time, url, count(1) as pv from log group by " +
+ "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table, streamType=\"append\")";
+ result = flinkInterpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("url\tpv\n"));
+ }
+
+ @Test
public void testStreamUDF() throws IOException, InterpreterException {
String initStreamScalaScript = IOUtils.toString(getClass().getResource("/init_stream.scala"));
InterpreterResult result = flinkInterpreter.interpret(initStreamScalaScript,
@@ -118,7 +167,7 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
context.getLocalProperties().put("type", "update");
result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
"log group by url", context);
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
// assertEquals(InterpreterResult.Type.TABLE,
// updatedOutput.toInterpreterResultMessage().getType());
// assertTrue(updatedOutput.toInterpreterResultMessage().getData(),
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index eb678a2..1a604d3 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -19,6 +19,7 @@ package org.apache.zeppelin.flink;
import com.google.common.io.Files;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
@@ -26,6 +27,7 @@ import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
import org.apache.zeppelin.interpreter.InterpreterResult;
+import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.IPythonInterpreterTest;
@@ -35,6 +37,7 @@ import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import static junit.framework.TestCase.assertTrue;
@@ -95,7 +98,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
}
@Test
- public void testBatchIPyFlink() throws InterpreterException {
+ public void testBatchIPyFlink() throws InterpreterException, IOException {
testBatchPyFlink(interpreter, flinkScalaInterpreter);
}
@@ -104,8 +107,23 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
- public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException {
- InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ @Test
+ public void testSingleStreamTableApi() throws InterpreterException, IOException {
+ testSingleStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
+ @Test
+ public void testUpdateStreamTableApi() throws InterpreterException, IOException {
+ testUpdateStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
+ @Test
+ public void testAppendStreamTableApi() throws InterpreterException, IOException {
+ testAppendStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
+ public static void testBatchPyFlink(Interpreter pyflinkInterpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
+ InterpreterContext context = createInterpreterContext();
InterpreterResult result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
@@ -135,7 +153,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// use group by
- context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ context = createInterpreterContext();
result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
@@ -176,7 +194,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ context = createInterpreterContext();
result = pyflinkInterpreter.interpret(
"import tempfile\n" +
"import os\n" +
@@ -204,6 +222,28 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
"bt_env.execute(\"batch_job3\")"
, context);
assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+
+ // z.show
+ context = createInterpreterContext();
+ result = pyflinkInterpreter.interpret(
+ "import tempfile\n" +
+ "import os\n" +
+ "import shutil\n" +
+ "sink_path = tempfile.gettempdir() + '/streaming.csv'\n" +
+ "if os.path.exists(sink_path):\n" +
+ " if os.path.isfile(sink_path):\n" +
+ " os.remove(sink_path)\n" +
+ " else:\n" +
+ " shutil.rmtree(sink_path)\n" +
+ "b_env.set_parallelism(1)\n" +
+ "t = bt_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c'])\n" +
+ "z.show(t)"
+ , context);
+ assertEquals(result.toString(),InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(new String(context.out.toByteArray()), 1, resultMessages.size());
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertEquals(new String(context.out.toByteArray()), "a\tb\tc\n1\thi\thello\n2\thi\thello\n", resultMessages.get(0).getData());
}
@Override
@@ -223,7 +263,7 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
}
public static void testStreamPyFlink(Interpreter interpreter, Interpreter flinkScalaInterpreter) throws InterpreterException, IOException {
- InterpreterContext context = createInterpreterContext(mock(RemoteInterpreterEventClient.class));
+ InterpreterContext context = createInterpreterContext();
InterpreterResult result = interpreter.interpret(
"import tempfile\n" +
"import os\n" +
@@ -253,13 +293,65 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
}
- private static InterpreterContext createInterpreterContext(
- RemoteInterpreterEventClient mockRemoteEventClient) {
+ public static void testSingleStreamTableApi(Interpreter interpreter,
+ Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+ InterpreterContext context = createInterpreterContext();
+ InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = createInterpreterContext();
+ String code = "table = st_env.sql_query('select max(rowtime), count(1) from log')\nz.show(table,stream_type='single',template = 'Total Count: {1} <br/> {0}')";
+ result = interpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.HTML, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("Total Count"));
+ }
+
+ public static void testUpdateStreamTableApi(Interpreter interpreter,
+ Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+ InterpreterContext context = createInterpreterContext();
+ InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = createInterpreterContext();
+ String code = "table = st_env.sql_query('select url, count(1) as pv from log group by url')\nz.show(table,stream_type='update')";
+ result = interpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("url\tpv\n"));
+ }
+
+ public static void testAppendStreamTableApi(Interpreter interpreter,
+ Interpreter flinkScalaInterpreter) throws IOException, InterpreterException {
+ String initStreamScalaScript = IOUtils.toString(IPyFlinkInterpreterTest.class.getResource("/init_stream.scala"));
+ InterpreterContext context = createInterpreterContext();
+ InterpreterResult result = flinkScalaInterpreter.interpret(initStreamScalaScript, context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ context = createInterpreterContext();
+ String code = "table = st_env.sql_query(\"select TUMBLE_START(rowtime, INTERVAL '5' SECOND) as " +
+ "start_time, url, count(1) as pv from log group by " +
+ "TUMBLE(rowtime, INTERVAL '5' SECOND), url\")\nz.show(table,stream_type='append')";
+ result = interpreter.interpret(code, context);
+ assertEquals(new String(context.out.toByteArray()), InterpreterResult.Code.SUCCESS, result.code());
+ List<InterpreterResultMessage> resultMessages = context.out.toInterpreterResultMessage();
+ assertEquals(InterpreterResult.Type.TABLE, resultMessages.get(0).getType());
+ assertTrue(resultMessages.toString(),
+ resultMessages.get(0).getData().contains("url\tpv\n"));
+ }
+
+ private static InterpreterContext createInterpreterContext() {
return InterpreterContext.builder()
.setNoteId("noteId")
.setParagraphId("paragraphId")
- .setIntpEventClient(mockRemoteEventClient)
.setInterpreterOut(new InterpreterOutput(null))
+ .setIntpEventClient(mock(RemoteInterpreterEventClient.class))
.build();
}
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
index 7bbc1dd..d6a24a2 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/PyFlinkInterpreterTest.java
@@ -31,16 +31,12 @@ import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.LazyOpenInterpreter;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.apache.zeppelin.python.PythonInterpreterTest;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.LinkedList;
import java.util.Properties;
-import static junit.framework.TestCase.assertTrue;
-import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -109,11 +105,30 @@ public class PyFlinkInterpreterTest extends PythonInterpreterTest {
}
@Test
- public void testPyFlink() throws InterpreterException, IOException {
+ public void testBatchPyFlink() throws InterpreterException, IOException {
IPyFlinkInterpreterTest.testBatchPyFlink(interpreter, flinkScalaInterpreter);
+ }
+
+ @Test
+ public void testStreamIPyFlink() throws InterpreterException, IOException {
IPyFlinkInterpreterTest.testStreamPyFlink(interpreter, flinkScalaInterpreter);
}
+ @Test
+ public void testSingleStreamTableApi() throws InterpreterException, IOException {
+ IPyFlinkInterpreterTest.testSingleStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
+ @Test
+ public void testUpdateStreamTableApi() throws InterpreterException, IOException {
+ IPyFlinkInterpreterTest.testUpdateStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
+ @Test
+ public void testAppendStreamTableApi() throws InterpreterException, IOException {
+ IPyFlinkInterpreterTest.testAppendStreamTableApi(interpreter, flinkScalaInterpreter);
+ }
+
protected InterpreterContext getInterpreterContext() {
appendOutput = "";
InterpreterContext context = InterpreterContext.builder()
diff --git a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
index 0dbcdc3..34dc05c 100644
--- a/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
+++ b/flink/src/test/java/org/apache/zeppelin/flink/SqlInterpreterTest.java
@@ -42,10 +42,8 @@ import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterGroup;
import org.apache.zeppelin.interpreter.InterpreterOutput;
-import org.apache.zeppelin.interpreter.InterpreterOutputListener;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
-import org.apache.zeppelin.interpreter.InterpreterResultMessageOutput;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterEventClient;
import org.junit.After;
import org.junit.Before;
@@ -62,7 +60,8 @@ import java.io.PrintWriter;
import java.util.List;
import java.util.Properties;
-import static org.apache.zeppelin.interpreter.InterpreterResult.*;
+import static org.apache.zeppelin.interpreter.InterpreterResult.Code;
+import static org.apache.zeppelin.interpreter.InterpreterResult.Type;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@@ -358,6 +357,7 @@ public abstract class SqlInterpreterTest {
protected InterpreterContext getInterpreterContext() {
return InterpreterContext.builder()
+ .setParagraphId("paragraphId")
.setInterpreterOut(new InterpreterOutput(null))
.setAngularObjectRegistry(new AngularObjectRegistry("flink", null))
.setIntpEventClient(mock(RemoteInterpreterEventClient.class))