You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2019/07/31 11:54:43 UTC
[flink] 02/06: [FLINK-13273][table-planner-blink] Allow retrieving
StreamGraph from Blink executor
This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6fc02d1cb956ca4fe1709f032906d3dc80fb5a6c
Author: Timo Walther <tw...@apache.org>
AuthorDate: Mon Jul 29 11:42:20 2019 +0200
[FLINK-13273][table-planner-blink] Allow retrieving StreamGraph from Blink executor
---
.../flink/table/planner/delegation/BatchExecutor.java | 6 ++----
.../flink/table/planner/delegation/ExecutorBase.java | 14 +++++++++++---
.../flink/table/planner/delegation/StreamExecutor.java | 5 ++---
3 files changed, 15 insertions(+), 10 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
index de7fd3c..7bf4367 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/BatchExecutor.java
@@ -51,7 +51,7 @@ public class BatchExecutor extends ExecutorBase {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
- StreamGraph streamGraph = generateStreamGraph(transformations, jobName);
+ StreamGraph streamGraph = generateStreamGraph(jobName);
return execEnv.execute(streamGraph);
}
@@ -69,9 +69,7 @@ public class BatchExecutor extends ExecutorBase {
}
}
- /**
- * Translates transformationList to streamGraph.
- */
+ @Override
public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
StreamExecutionEnvironment execEnv = getExecutionEnvironment();
setBatchProperties(execEnv);
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
index 701a6cc..10eeafd 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/ExecutorBase.java
@@ -58,9 +58,17 @@ public abstract class ExecutorBase implements Executor {
return executionEnvironment;
}
- public abstract StreamGraph generateStreamGraph(
- List<Transformation<?>> transformations,
- String jobName) throws Exception;
+ /**
+ * Translates the applied transformations to a stream graph.
+ */
+ public StreamGraph generateStreamGraph(String jobName) {
+ return generateStreamGraph(transformations, jobName);
+ }
+
+ /**
+ * Translates the given transformations to a stream graph.
+ */
+ public abstract StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName);
protected String getNonEmptyJobName(String jobName) {
if (StringUtils.isNullOrWhitespaceOnly(jobName)) {
diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
index 4af2f8e..8d1e904 100644
--- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
+++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/delegation/StreamExecutor.java
@@ -46,9 +46,8 @@ public class StreamExecutor extends ExecutorBase {
return execEnv.execute(generateStreamGraph(transformations, jobName));
}
- public StreamGraph generateStreamGraph(
- List<Transformation<?>> transformations,
- String jobName) throws Exception {
+ @Override
+ public StreamGraph generateStreamGraph(List<Transformation<?>> transformations, String jobName) {
transformations.forEach(getExecutionEnvironment()::addOperator);
return getExecutionEnvironment().getStreamGraph(getNonEmptyJobName(jobName));
}