You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/22 02:11:52 UTC
[zeppelin] branch master updated: [ZEPPELIN-5280]. Use update as
the default type of %flink.ssql
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new a3f0dd5 [ZEPPELIN-5280]. Use update as the default type of %flink.ssql
a3f0dd5 is described below
commit a3f0dd5ef062d1a8e54fc33f6f95b4609639d1b1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Mar 6 18:32:25 2021 +0800
[ZEPPELIN-5280]. Use update as the default type of %flink.ssql
### What is this PR for?
Trivial PR to make update as the default type of `%flink.ssql`, so that user don't need to specify type in most of cases.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5280
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #4181 from zjffdu/ZEPPELIN-5280 and squashes the following commits:
e473fb8ed3 [Jeff Zhang] [ZEPPELIN-5280]. Use update as the default type of %flink.ssql
---
.../org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java | 5 +----
.../java/org/apache/zeppelin/flink/FlinkInterpreterTest.java | 4 ----
.../apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java | 10 ----------
.../org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java | 4 ----
4 files changed, 1 insertion(+), 22 deletions(-)
diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index dd85272..60c5c5a 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -56,10 +56,7 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
@Override
public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
- String streamType = context.getLocalProperties().get("type");
- if (streamType == null) {
- throw new IOException("type must be specified for stream sql");
- }
+ String streamType = context.getLocalProperties().getOrDefault("type", "update");
if (streamType.equalsIgnoreCase("single")) {
SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
flinkInterpreter.getStreamExecutionEnvironment(),
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index ceced31..8649138 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -309,7 +309,6 @@ public class FlinkInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
InterpreterResult result2 = interpreter.interpret(
"val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
"log group by url\")\nz.show(table, streamType=\"update\")", context);
@@ -330,7 +329,6 @@ public class FlinkInterpreterTest {
Thread.sleep(20 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
interpreter.cancel(context);
waiter.await(10 * 1000);
// resume job
@@ -356,7 +354,6 @@ public class FlinkInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
@@ -380,7 +377,6 @@ public class FlinkInterpreterTest {
Thread.sleep(20 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 382a9b9..98604cd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -95,7 +95,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
result = sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -171,7 +170,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
"log group by url", context);
waiter.assertTrue(context.out.toString().contains("Job was cancelled"));
@@ -189,7 +187,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread.sleep(10 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
sqlInterpreter.cancel(context);
waiter.await(10 * 1000);
// resume job
@@ -215,7 +212,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
@@ -238,7 +234,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread.sleep(10 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
@@ -267,7 +262,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
@@ -288,7 +282,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
Thread.sleep(10 * 1000);
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");
@@ -322,7 +315,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
context.getLocalProperties().put(JobManager.RESUME_FROM_SAVEPOINT, "true");
@@ -349,7 +341,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
InterpreterContext context = getInterpreterContext();
- context.getLocalProperties().put("type", "update");
result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
"log group by url", context);
assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -442,7 +433,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
// runAsOne won't affect the select statement.
context = getInterpreterContext();
context.getLocalProperties().put("runAsOne", "true");
- context.getLocalProperties().put("type", "update");
result = sqlInterpreter.interpret(
"select 1",
context);
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index fb562cd..e1536bd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -382,7 +382,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = createInterpreterContext();
- context.getLocalProperties().put("type", "update");
InterpreterResult result2 = interpreter.interpret(
"table = st_env.sql_query('select url, count(1) as pv from " +
"log group by url')\nz.show(table, stream_type='update')", context);
@@ -402,7 +401,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
Thread.sleep(20 * 1000);
InterpreterContext context = createInterpreterContext();
- context.getLocalProperties().put("type", "update");
interpreter.cancel(context);
waiter.await(10 * 1000);
// resume job
@@ -426,7 +424,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
Thread thread = new Thread(() -> {
try {
InterpreterContext context = createInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "1");
context.getLocalProperties().put("maxParallelism", "10");
@@ -449,7 +446,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
Thread.sleep(20 * 1000);
InterpreterContext context = createInterpreterContext();
- context.getLocalProperties().put("type", "update");
context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
context.getLocalProperties().put("parallelism", "2");
context.getLocalProperties().put("maxParallelism", "10");