You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/03/30 14:27:36 UTC

[zeppelin] branch master updated: [ZEPPELIN-5668] Support statement set in flink (#4332)

This is an automated email from the ASF dual-hosted git repository.

jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b60f08  [ZEPPELIN-5668] Support statement set in flink (#4332)
4b60f08 is described below

commit 4b60f08bd0ad241a7b197d7165abbb7c066ec979
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Mar 30 22:27:30 2022 +0800

    [ZEPPELIN-5668] Support statement set in flink (#4332)
---
 .github/workflows/core.yml                         |   2 +-
 .../flink/FlinkStreamSqlInterpreterTest.java       | 168 ++++++++++++++++++++-
 .../zeppelin/flink/Flink113SqlInterpreter.java     |  21 +--
 .../zeppelin/flink/Flink114SqlInterpreter.java     |  21 +--
 4 files changed, 191 insertions(+), 21 deletions(-)

diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index d4d9b95..75cccdb 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -250,7 +250,7 @@ jobs:
           auto-activate-base: false
           use-mamba: true
       - name: run tests
-        run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }}
+        run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }}
 
   spark-integration-test:
     runs-on: ubuntu-20.04
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 9894764..49b609d 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -20,12 +20,16 @@ package org.apache.zeppelin.flink;
 import net.jodah.concurrentunit.Waiter;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterResultMessage;
 import org.junit.Test;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Files;
@@ -38,11 +42,40 @@ import static org.junit.Assert.assertEquals;
 
 public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
 
+
+  private static class FlinkJobListener implements JobListener {
+
+    private int jobCount = 0;
+
+    public int getJobCount() {
+      return jobCount;
+    }
+
+    @Override
+    public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+      jobCount ++;
+    }
+
+    @Override
+    public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+    }
+  }
+
+  private FlinkJobListener flinkJobListener;
+
   @Override
   protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
     return new FlinkStreamSqlInterpreter(properties);
   }
 
+  @Override
+  public void setUp() throws InterpreterException, IOException {
+    super.setUp();
+    flinkJobListener = new FlinkJobListener();
+    flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(flinkJobListener);
+  }
+
   @Test
   public void testSingleStreamSql() throws IOException, InterpreterException {
     String initStreamScalaScript = getInitStreamScript(100);
@@ -434,7 +467,65 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
   }
 
   @Test
-  public void testMultipleInsertInto() throws InterpreterException, IOException {
+  public void testMultipleInsertIntoSeparately() throws InterpreterException, IOException {
+    hiveShell.execute("create table source_table (id int, name string)");
+    hiveShell.execute("insert into source_table values(1, 'name')");
+
+    File destDir = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir);
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE TABLE dest_table (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", getInterpreterContext());
+
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    File destDir2 = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir2);
+    result = sqlInterpreter.interpret(
+            "CREATE TABLE dest_table2 (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", getInterpreterContext());
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+    InterpreterContext context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "insert into dest_table select * from source_table;\n" +
+                    "insert into dest_table2 select * from source_table",
+            context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // two  flink jobs are executed
+    assertEquals(2, flinkJobListener.getJobCount());
+
+    // check dest_table
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
+
+    // check dest_table2
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
+  }
+
+  @Test
+  public void testMultipleInsertIntoRunAsOne() throws InterpreterException, IOException {
     hiveShell.execute("create table source_table (id int, name string)");
     hiveShell.execute("insert into source_table values(1, 'name')");
 
@@ -475,18 +566,22 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
             "insert into dest_table select * from source_table;insert into dest_table2 select * from source_table",
             context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    // only one flink job is executed
+    assertEquals(1, flinkJobListener.getJobCount());
 
     // check dest_table
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("c\n1\n", context.out.toString());
+    assertEquals(2, flinkJobListener.getJobCount());
 
     // check dest_table2
     context = getInterpreterContext();
     result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("c\n1\n", context.out.toString());
+    assertEquals(3, flinkJobListener.getJobCount());
 
     // runAsOne won't affect the select statement.
     context = getInterpreterContext();
@@ -495,8 +590,77 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
             "select 1 as a",
             context);
     assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
     assertEquals("a\n1\n", context.out.toString());
+    assertEquals(4, flinkJobListener.getJobCount());
+  }
+
+  @Test
+  public void testStatementSet() throws IOException, InterpreterException {
+    if (flinkInterpreter.getFlinkVersion().getMinorVersion() == 12) {
+      LOGGER.warn("Skip Flink 1.12 as statement set is not supported before 1.12");
+      return;
+    }
+    hiveShell.execute("create table source_table (id int, name string)");
+    hiveShell.execute("insert into source_table values(1, 'name')");
+
+    File destDir = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir);
+    InterpreterContext context = getInterpreterContext();
+    InterpreterResult result = sqlInterpreter.interpret(
+            "CREATE TABLE dest_table (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", context);
+
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+
+    File destDir2 = Files.createTempDirectory("flink_test").toFile();
+    FileUtils.deleteDirectory(destDir2);
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "CREATE TABLE dest_table2 (\n" +
+                    "id int,\n" +
+                    "name string" +
+                    ") WITH (\n" +
+                    "'format.field-delimiter'=',',\n" +
+                    "'connector.type'='filesystem',\n" +
+                    "'format.derive-schema'='true',\n" +
+                    "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" +
+                    "'format.type'='csv'\n" +
+                    ");", context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+
+    // insert into 2 sink tables in one statement set
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret(
+            "begin statement set;\n" +
+                    "insert into dest_table select * from source_table;\n" +
+                    "insert into dest_table2 select * from source_table;\n" +
+                    "end;",
+            context);
+    assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+    // only one flink job is executed
+    assertEquals(1, flinkJobListener.getJobCount());
+
+    // check dest_table
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
+    assertEquals(2, flinkJobListener.getJobCount());
+
+    // check dest_table2
+    context = getInterpreterContext();
+    result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+    assertEquals("c\n1\n", context.out.toString());
+    assertEquals(3, flinkJobListener.getJobCount());
   }
 
   @Test
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
index e3fe741..fa01fac 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
@@ -164,8 +164,11 @@ public class Flink113SqlInterpreter {
   private ZeppelinContext z;
   private Parser sqlParser;
   private SqlSplitter sqlSplitter;
-  // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
-  private Map<String, Boolean> statementModeMap = new HashMap<>();
+  // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+  // 1. runAsOne= true
+  // 2. begin statement set;
+  //    ...
+  //    end;
   private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
   private boolean isBatch;
   private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
@@ -204,7 +207,10 @@ public class Flink113SqlInterpreter {
   public InterpreterResult runSqlList(String st, InterpreterContext context) {
     try {
       boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-      statementModeMap.put(context.getParagraphId(), runAsOne);
+      if (runAsOne) {
+        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+      }
+
       String jobName = context.getLocalProperties().get("jobName");
       if (StringUtils.isNotBlank(jobName)) {
         tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
@@ -259,7 +265,6 @@ public class Flink113SqlInterpreter {
       return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
     } finally {
       statementOperationsMap.remove(context.getParagraphId());
-      statementModeMap.remove(context.getParagraphId());
     }
 
     return new InterpreterResult(InterpreterResult.Code.SUCCESS);
@@ -345,10 +350,9 @@ public class Flink113SqlInterpreter {
   }
 
   private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
-    if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
-      List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+    if (statementOperationsMap.containsKey(context.getParagraphId())) {
+      List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
       modifyOperations.add(operation);
-      statementOperationsMap.put(context.getParagraphId(), modifyOperations);
     } else {
       callInserts(Collections.singletonList(operation), context);
     }
@@ -444,7 +448,7 @@ public class Flink113SqlInterpreter {
   }
 
   private void callBeginStatementSet(InterpreterContext context) throws IOException {
-    statementModeMap.put(context.getParagraphId(), true);
+    statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
   }
 
   private void callEndStatementSet(InterpreterContext context) throws IOException {
@@ -454,7 +458,6 @@ public class Flink113SqlInterpreter {
     } else {
       context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
     }
-    statementModeMap.remove(context.getParagraphId());
   }
 
   private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
index f155fcc..eb0d684 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
@@ -169,8 +169,11 @@ public class Flink114SqlInterpreter {
   private ZeppelinContext z;
   private Parser sqlParser;
   private SqlSplitter sqlSplitter;
-  // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
-  private Map<String, Boolean> statementModeMap = new HashMap<>();
+  // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+  // 1. runAsOne= true
+  // 2. begin statement set;
+  //    ...
+  //    end;
   private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
   private boolean isBatch;
   private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
@@ -209,7 +212,10 @@ public class Flink114SqlInterpreter {
   public InterpreterResult runSqlList(String st, InterpreterContext context) {
     try {
       boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
-      statementModeMap.put(context.getParagraphId(), runAsOne);
+      if (runAsOne) {
+        statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+      }
+
       String jobName = context.getLocalProperties().get("jobName");
       if (StringUtils.isNotBlank(jobName)) {
         tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
@@ -264,7 +270,6 @@ public class Flink114SqlInterpreter {
       return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
     } finally {
       statementOperationsMap.remove(context.getParagraphId());
-      statementModeMap.remove(context.getParagraphId());
     }
 
     return new InterpreterResult(InterpreterResult.Code.SUCCESS);
@@ -359,10 +364,9 @@ public class Flink114SqlInterpreter {
   }
 
   private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
-    if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
-      List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+    if (statementOperationsMap.containsKey(context.getParagraphId())) {
+      List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
       modifyOperations.add(operation);
-      statementOperationsMap.put(context.getParagraphId(), modifyOperations);
     } else {
       callInserts(Collections.singletonList(operation), context);
     }
@@ -472,7 +476,7 @@ public class Flink114SqlInterpreter {
   }
 
   private void callBeginStatementSet(InterpreterContext context) throws IOException {
-    statementModeMap.put(context.getParagraphId(), true);
+    statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
   }
 
   private void callEndStatementSet(InterpreterContext context) throws IOException {
@@ -482,7 +486,6 @@ public class Flink114SqlInterpreter {
     } else {
       context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
     }
-    statementModeMap.remove(context.getParagraphId());
   }
 
   private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {