You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/07/22 02:11:52 UTC

[zeppelin] branch master updated: [ZEPPELIN-5280]. Use update as the default type of %flink.ssql

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new a3f0dd5  [ZEPPELIN-5280]. Use update as the default type of %flink.ssql
a3f0dd5 is described below

commit a3f0dd5ef062d1a8e54fc33f6f95b4609639d1b1
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Sat Mar 6 18:32:25 2021 +0800

    [ZEPPELIN-5280]. Use update as the default type of %flink.ssql
    
    ### What is this PR for?
    
    Trivial PR to make update as the default type of `%flink.ssql`, so that user don't need to specify type in most of cases.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5280
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4181 from zjffdu/ZEPPELIN-5280 and squashes the following commits:
    
    e473fb8ed3 [Jeff Zhang] [ZEPPELIN-5280]. Use update as the default type of %flink.ssql
---
 .../org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java   |  5 +----
 .../java/org/apache/zeppelin/flink/FlinkInterpreterTest.java   |  4 ----
 .../apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java   | 10 ----------
 .../org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java     |  4 ----
 4 files changed, 1 insertion(+), 22 deletions(-)

diff --git a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
index dd85272..60c5c5a 100644
--- a/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
+++ b/flink/flink-scala-parent/src/main/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreter.java
@@ -56,10 +56,7 @@ public class FlinkStreamSqlInterpreter extends FlinkSqlInterrpeter {
 
   @Override
   public void callInnerSelect(String sql, InterpreterContext context) throws IOException {
-    String streamType = context.getLocalProperties().get("type");
-    if (streamType == null) {
-      throw new IOException("type must be specified for stream sql");
-    }
+    String streamType = context.getLocalProperties().getOrDefault("type", "update");
     if (streamType.equalsIgnoreCase("single")) {
       SingleRowStreamSqlJob streamJob = new SingleRowStreamSqlJob(
               flinkInterpreter.getStreamExecutionEnvironment(),
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
index ceced31..8649138 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkInterpreterTest.java
@@ -309,7 +309,6 @@ public class FlinkInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         InterpreterResult result2 = interpreter.interpret(
                 "val table = stenv.sqlQuery(\"select url, count(1) as pv from " +
                 "log group by url\")\nz.show(table, streamType=\"update\")", context);
@@ -330,7 +329,6 @@ public class FlinkInterpreterTest {
     Thread.sleep(20 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     interpreter.cancel(context);
     waiter.await(10 * 1000);
     // resume job
@@ -356,7 +354,6 @@ public class FlinkInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
@@ -380,7 +377,6 @@ public class FlinkInterpreterTest {
     Thread.sleep(20 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 382a9b9..98604cd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -95,7 +95,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     result = sqlInterpreter.interpret("select url, count(1) as pv from " +
             "log group by url", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
@@ -171,7 +170,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         InterpreterResult result2 = sqlInterpreter.interpret("select url, count(1) as pv from " +
                 "log group by url", context);
         waiter.assertTrue(context.out.toString().contains("Job was cancelled"));
@@ -189,7 +187,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread.sleep(10 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     sqlInterpreter.cancel(context);
     waiter.await(10 * 1000);
     // resume job
@@ -215,7 +212,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
@@ -238,7 +234,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread.sleep(10 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
@@ -267,7 +262,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = getInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
@@ -288,7 +282,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     Thread.sleep(10 * 1000);
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");
@@ -322,7 +315,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     context.getLocalProperties().put("parallelism", "1");
     context.getLocalProperties().put("maxParallelism", "10");
     context.getLocalProperties().put(JobManager.RESUME_FROM_SAVEPOINT, "true");
@@ -349,7 +341,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
 
     InterpreterContext context = getInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     result = sqlInterpreter.interpret("select myupper(url), count(1) as pv from " +
             "log group by url", context);
     assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
@@ -442,7 +433,6 @@ public class FlinkStreamSqlInterpreterTest extends SqlInterpreterTest {
     // runAsOne won't affect the select statement.
     context = getInterpreterContext();
     context.getLocalProperties().put("runAsOne", "true");
-    context.getLocalProperties().put("type", "update");
     result = sqlInterpreter.interpret(
             "select 1",
             context);
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
index fb562cd..e1536bd 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/IPyFlinkInterpreterTest.java
@@ -382,7 +382,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = createInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         InterpreterResult result2 = interpreter.interpret(
                 "table = st_env.sql_query('select url, count(1) as pv from " +
                         "log group by url')\nz.show(table, stream_type='update')", context);
@@ -402,7 +401,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     Thread.sleep(20 * 1000);
 
     InterpreterContext context = createInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     interpreter.cancel(context);
     waiter.await(10 * 1000);
     // resume job
@@ -426,7 +424,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     Thread thread = new Thread(() -> {
       try {
         InterpreterContext context = createInterpreterContext();
-        context.getLocalProperties().put("type", "update");
         context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
         context.getLocalProperties().put("parallelism", "1");
         context.getLocalProperties().put("maxParallelism", "10");
@@ -449,7 +446,6 @@ public class IPyFlinkInterpreterTest extends IPythonInterpreterTest {
     Thread.sleep(20 * 1000);
 
     InterpreterContext context = createInterpreterContext();
-    context.getLocalProperties().put("type", "update");
     context.getLocalProperties().put("savePointDir", savePointDir.getAbsolutePath());
     context.getLocalProperties().put("parallelism", "2");
     context.getLocalProperties().put("maxParallelism", "10");