You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/31 11:54:43 UTC

[flink] 02/06: [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor

This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6fc02d1cb956ca4fe1709f032906d3dc80fb5a6c
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:42:20 2019 +0200

    [FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor
---
 .../flink/table/planner/delegation/BatchExecutor.java      |  6 ++----
 .../flink/table/planner/delegation/ExecutorBase.java       | 14 +++++++++++---
 .../flink/table/planner/delegation/StreamExecutor.java     |  5 ++---
 3 files changed, 15 insertions(+), 10 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index de7fd3c..7bf4367 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -51,7 +51,7 @@ public class BatchExecutor extends ExecutorBase {
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
-		StreamGraph streamGraph = generateStreamGraph(transformations, jobName);
+		StreamGraph streamGraph = generateStreamGraph(jobName);
 		return execEnv.execute(streamGraph);
 	}
 
@@ -69,9 +69,7 @@ public class BatchExecutor extends ExecutorBase {
 		}
 	}
 
-	/**
-	 * Translates transformationList to streamGraph.
-	 */
+	@Override
 	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
 		StreamExecutionEnvironment execEnv = getExecutionEnvironment();
 		setBatchProperties(execEnv);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 701a6cc..10eeafd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -58,9 +58,17 @@ public abstract class ExecutorBase implements Executor {
 		return executionEnvironment;
 	}
 
-	public abstract StreamGraph generateStreamGraph(
-			List<Transformation<?>> transformations,
-			String jobName) throws Exception;
+	/**
+	 * Translates the applied transformations to a stream graph.
+	 */
+	public StreamGraph generateStreamGraph(String jobName) {
+		return generateStreamGraph(transformations, jobName);
+	}
+
+	/**
+	 * Translates the given transformations to a stream graph.
+	 */
+	public abstract StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName);
 
 	protected String getNonEmptyJobName(String jobName) {
 		if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 4af2f8e..8d1e904 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -46,9 +46,8 @@ public class StreamExecutor extends ExecutorBase {
 		return execEnv.execute(generateStreamGraph(transformations, jobName));
 	}
 
-	public StreamGraph generateStreamGraph(
-			List<Transformation<?>> transformations,
-			String jobName) throws Exception {
+	@Override
+	public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
 		transformations.forEach(getExecutionEnvironment()::addOperator);
 		return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
 	}