You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by jo...@apache.org on 2022/03/30 14:27:36 UTC
[zeppelin] branch master updated: [ZEPPELIN-5668] Support statement set in flink (#4332)
This is an automated email from the ASF dual-hosted git repository.
jongyoul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 4b60f08 [ZEPPELIN-5668] Support statement set in flink (#4332)
4b60f08 is described below
commit 4b60f08bd0ad241a7b197d7165abbb7c066ec979
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Mar 30 22:27:30 2022 +0800
[ZEPPELIN-5668] Support statement set in flink (#4332)
---
.github/workflows/core.yml | 2 +-
.../flink/FlinkStreamSqlInterpreterTest.java | 168 ++++++++++++++++++++-
.../zeppelin/flink/Flink113SqlInterpreter.java | 21 +--
.../zeppelin/flink/Flink114SqlInterpreter.java | 21 +--
4 files changed, 191 insertions(+), 21 deletions(-)
diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml
index d4d9b95..75cccdb 100644
--- a/.github/workflows/core.yml
+++ b/.github/workflows/core.yml
@@ -250,7 +250,7 @@ jobs:
auto-activate-base: false
use-mamba: true
- name: run tests
- run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*,FlinkIntegrationTest${{ matrix.flink }}
+ run: ./mvnw test -DskipRat -pl flink/flink-scala-2.11,flink/flink-scala-2.12,flink-cmd,zeppelin-interpreter-integration -Pflink-${{ matrix.flink }} -Phadoop2 -Pintegration -DfailIfNoTests=false -B -Dtest=org.apache.zeppelin.flink.*Test,FlinkIntegrationTest${{ matrix.flink }}
spark-integration-test:
runs-on: ubuntu-20.04
diff --git a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
index 9894764..49b609d 100644
--- a/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
+++ b/flink/flink-scala-parent/src/test/java/org/apache/zeppelin/flink/FlinkStreamSqlInterpreterTest.java
@@ -20,12 +20,16 @@ package org.apache.zeppelin.flink;
import net.jodah.concurrentunit.Waiter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterResultMessage;
import org.junit.Test;
+import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
@@ -38,11 +42,40 @@ import static org.junit.Assert.assertEquals;
public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
+
+ private static class FlinkJobListener implements JobListener {
+
+ private int jobCount = 0;
+
+ public int getJobCount() {
+ return jobCount;
+ }
+
+ @Override
+ public void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {
+ jobCount ++;
+ }
+
+ @Override
+ public void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {
+
+ }
+ }
+
+ private FlinkJobListener flinkJobListener;
+
@Override
protected FlinkSqlInterpreter createFlinkSqlInterpreter(Properties properties) {
return new FlinkStreamSqlInterpreter(properties);
}
+ @Override
+ public void setUp() throws InterpreterException, IOException {
+ super.setUp();
+ flinkJobListener = new FlinkJobListener();
+ flinkInterpreter.getStreamExecutionEnvironment().getJavaEnv().registerJobListener(flinkJobListener);
+ }
+
@Test
public void testSingleStreamSql() throws IOException, InterpreterException {
String initStreamScalaScript = getInitStreamScript(100);
@@ -434,7 +467,65 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
}
@Test
- public void testMultipleInsertInto() throws InterpreterException, IOException {
+ public void testMultipleInsertIntoSeparately() throws InterpreterException, IOException {
+ hiveShell.execute("create table source_table (id int, name string)");
+ hiveShell.execute("insert into source_table values(1, 'name')");
+
+ File destDir = Files.createTempDirectory("flink_test").toFile();
+ FileUtils.deleteDirectory(destDir);
+ InterpreterResult result = sqlInterpreter.interpret(
+ "CREATE TABLE dest_table (\n" +
+ "id int,\n" +
+ "name string" +
+ ") WITH (\n" +
+ "'format.field-delimiter'=',',\n" +
+ "'connector.type'='filesystem',\n" +
+ "'format.derive-schema'='true',\n" +
+ "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+ "'format.type'='csv'\n" +
+ ");", getInterpreterContext());
+
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ File destDir2 = Files.createTempDirectory("flink_test").toFile();
+ FileUtils.deleteDirectory(destDir2);
+ result = sqlInterpreter.interpret(
+ "CREATE TABLE dest_table2 (\n" +
+ "id int,\n" +
+ "name string" +
+ ") WITH (\n" +
+ "'format.field-delimiter'=',',\n" +
+ "'connector.type'='filesystem',\n" +
+ "'format.derive-schema'='true',\n" +
+ "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" +
+ "'format.type'='csv'\n" +
+ ");", getInterpreterContext());
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+
+ InterpreterContext context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "insert into dest_table select * from source_table;\n" +
+ "insert into dest_table2 select * from source_table",
+ context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ // two flink jobs are executed
+ assertEquals(2, flinkJobListener.getJobCount());
+
+ // check dest_table
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
+
+ // check dest_table2
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
+ }
+
+ @Test
+ public void testMultipleInsertIntoRunAsOne() throws InterpreterException, IOException {
hiveShell.execute("create table source_table (id int, name string)");
hiveShell.execute("insert into source_table values(1, 'name')");
@@ -475,18 +566,22 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
"insert into dest_table select * from source_table;insert into dest_table2 select * from source_table",
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ // only one flink job is executed
+ assertEquals(1, flinkJobListener.getJobCount());
// check dest_table
context = getInterpreterContext();
result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("c\n1\n", context.out.toString());
+ assertEquals(2, flinkJobListener.getJobCount());
// check dest_table2
context = getInterpreterContext();
result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("c\n1\n", context.out.toString());
+ assertEquals(3, flinkJobListener.getJobCount());
// runAsOne won't affect the select statement.
context = getInterpreterContext();
@@ -495,8 +590,77 @@ public class FlinkStreamSqlInterpreterTest extends FlinkSqlInterpreterTest {
"select 1 as a",
context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals("a\n1\n", context.out.toString());
+ assertEquals(4, flinkJobListener.getJobCount());
+ }
+
+ @Test
+ public void testStatementSet() throws IOException, InterpreterException {
+ if (flinkInterpreter.getFlinkVersion().getMinorVersion() == 12) {
+ LOGGER.warn("Skip Flink 1.12 as statement set is not supported before 1.12");
+ return;
+ }
+ hiveShell.execute("create table source_table (id int, name string)");
+ hiveShell.execute("insert into source_table values(1, 'name')");
+
+ File destDir = Files.createTempDirectory("flink_test").toFile();
+ FileUtils.deleteDirectory(destDir);
+ InterpreterContext context = getInterpreterContext();
+ InterpreterResult result = sqlInterpreter.interpret(
+ "CREATE TABLE dest_table (\n" +
+ "id int,\n" +
+ "name string" +
+ ") WITH (\n" +
+ "'format.field-delimiter'=',',\n" +
+ "'connector.type'='filesystem',\n" +
+ "'format.derive-schema'='true',\n" +
+ "'connector.path'='" + destDir.getAbsolutePath() + "',\n" +
+ "'format.type'='csv'\n" +
+ ");", context);
+
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+
+ File destDir2 = Files.createTempDirectory("flink_test").toFile();
+ FileUtils.deleteDirectory(destDir2);
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "CREATE TABLE dest_table2 (\n" +
+ "id int,\n" +
+ "name string" +
+ ") WITH (\n" +
+ "'format.field-delimiter'=',',\n" +
+ "'connector.type'='filesystem',\n" +
+ "'format.derive-schema'='true',\n" +
+ "'connector.path'='" + destDir2.getAbsolutePath() + "',\n" +
+ "'format.type'='csv'\n" +
+ ");", context);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+
+ // insert into 2 sink tables in one statement set
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret(
+ "begin statement set;\n" +
+ "insert into dest_table select * from source_table;\n" +
+ "insert into dest_table2 select * from source_table;\n" +
+ "end;",
+ context);
+ assertEquals(context.out.toString(), InterpreterResult.Code.SUCCESS, result.code());
+ // only one flink job is executed
+ assertEquals(1, flinkJobListener.getJobCount());
+
+ // check dest_table
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
+ assertEquals(2, flinkJobListener.getJobCount());
+
+ // check dest_table2
+ context = getInterpreterContext();
+ result = sqlInterpreter.interpret("select count(1) as c from dest_table2", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, result.code());
+ assertEquals("c\n1\n", context.out.toString());
+ assertEquals(3, flinkJobListener.getJobCount());
}
@Test
diff --git a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
index e3fe741..fa01fac 100644
--- a/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
+++ b/flink/flink1.13-shims/src/main/java/org/apache/zeppelin/flink/Flink113SqlInterpreter.java
@@ -164,8 +164,11 @@ public class Flink113SqlInterpreter {
private ZeppelinContext z;
private Parser sqlParser;
private SqlSplitter sqlSplitter;
- // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
- private Map<String, Boolean> statementModeMap = new HashMap<>();
+ // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+ // 1. runAsOne= true
+ // 2. begin statement set;
+ // ...
+ // end;
private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
private boolean isBatch;
private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
@@ -204,7 +207,10 @@ public class Flink113SqlInterpreter {
public InterpreterResult runSqlList(String st, InterpreterContext context) {
try {
boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
- statementModeMap.put(context.getParagraphId(), runAsOne);
+ if (runAsOne) {
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+ }
+
String jobName = context.getLocalProperties().get("jobName");
if (StringUtils.isNotBlank(jobName)) {
tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
@@ -259,7 +265,6 @@ public class Flink113SqlInterpreter {
return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
statementOperationsMap.remove(context.getParagraphId());
- statementModeMap.remove(context.getParagraphId());
}
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
@@ -345,10 +350,9 @@ public class Flink113SqlInterpreter {
}
private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
- if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
- List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ if (statementOperationsMap.containsKey(context.getParagraphId())) {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
modifyOperations.add(operation);
- statementOperationsMap.put(context.getParagraphId(), modifyOperations);
} else {
callInserts(Collections.singletonList(operation), context);
}
@@ -444,7 +448,7 @@ public class Flink113SqlInterpreter {
}
private void callBeginStatementSet(InterpreterContext context) throws IOException {
- statementModeMap.put(context.getParagraphId(), true);
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
}
private void callEndStatementSet(InterpreterContext context) throws IOException {
@@ -454,7 +458,6 @@ public class Flink113SqlInterpreter {
} else {
context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
}
- statementModeMap.remove(context.getParagraphId());
}
private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {
diff --git a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
index f155fcc..eb0d684 100644
--- a/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
+++ b/flink/flink1.14-shims/src/main/java/org/apache/zeppelin/flink/Flink114SqlInterpreter.java
@@ -169,8 +169,11 @@ public class Flink114SqlInterpreter {
private ZeppelinContext z;
private Parser sqlParser;
private SqlSplitter sqlSplitter;
- // paragraphId -> Boolean, indicate whether it is runAsOne mode for the current paragraph.
- private Map<String, Boolean> statementModeMap = new HashMap<>();
+ // paragraphId -> list of ModifyOperation, used for statement set in 2 syntax:
+ // 1. runAsOne= true
+ // 2. begin statement set;
+ // ...
+ // end;
private Map<String, List<ModifyOperation>> statementOperationsMap = new HashMap<>();
private boolean isBatch;
private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
@@ -209,7 +212,10 @@ public class Flink114SqlInterpreter {
public InterpreterResult runSqlList(String st, InterpreterContext context) {
try {
boolean runAsOne = Boolean.parseBoolean(context.getStringLocalProperty("runAsOne", "false"));
- statementModeMap.put(context.getParagraphId(), runAsOne);
+ if (runAsOne) {
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
+ }
+
String jobName = context.getLocalProperties().get("jobName");
if (StringUtils.isNotBlank(jobName)) {
tbenv.getConfig().getConfiguration().set(PipelineOptions.NAME, jobName);
@@ -264,7 +270,6 @@ public class Flink114SqlInterpreter {
return new InterpreterResult(InterpreterResult.Code.ERROR, ExceptionUtils.getStackTrace(e));
} finally {
statementOperationsMap.remove(context.getParagraphId());
- statementModeMap.remove(context.getParagraphId());
}
return new InterpreterResult(InterpreterResult.Code.SUCCESS);
@@ -359,10 +364,9 @@ public class Flink114SqlInterpreter {
}
private void callInsert(CatalogSinkModifyOperation operation, InterpreterContext context) throws IOException {
- if (statementModeMap.getOrDefault(context.getParagraphId(), false)) {
- List<ModifyOperation> modifyOperations = statementOperationsMap.getOrDefault(context.getParagraphId(), new ArrayList<>());
+ if (statementOperationsMap.containsKey(context.getParagraphId())) {
+ List<ModifyOperation> modifyOperations = statementOperationsMap.get(context.getParagraphId());
modifyOperations.add(operation);
- statementOperationsMap.put(context.getParagraphId(), modifyOperations);
} else {
callInserts(Collections.singletonList(operation), context);
}
@@ -472,7 +476,7 @@ public class Flink114SqlInterpreter {
}
private void callBeginStatementSet(InterpreterContext context) throws IOException {
- statementModeMap.put(context.getParagraphId(), true);
+ statementOperationsMap.put(context.getParagraphId(), new ArrayList<>());
}
private void callEndStatementSet(InterpreterContext context) throws IOException {
@@ -482,7 +486,6 @@ public class Flink114SqlInterpreter {
} else {
context.out.write(MESSAGE_NO_STATEMENT_IN_STATEMENT_SET);
}
- statementModeMap.remove(context.getParagraphId());
}
private void callUseCatalog(String catalog, InterpreterContext context) throws IOException {