You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/08 07:23:26 UTC
[flink] branch master updated: [FLINK-12961][datastream] Add
internal StreamExecutionEnvironment.execute(StreamGraph)
This is an automated email from the ASF dual-hosted git repository.
aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1542dd [FLINK-12961][datastream] Add internal StreamExecutionEnvironment.execute(StreamGraph)
f1542dd is described below
commit f1542dd37a3fd985aa9a688f4951942428b460be
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Tue Jun 25 11:54:27 2019 +0800
[FLINK-12961][datastream] Add internal StreamExecutionEnvironment.execute(StreamGraph)
---
.../Elasticsearch6UpsertTableSinkFactoryTest.java | 3 ++-
.../kafka/KafkaTableSourceSinkFactoryTestBase.java | 3 ++-
.../connectors/kafka/testutils/DataGenerators.java | 3 ++-
.../api/environment/LocalStreamEnvironment.java | 7 +------
.../api/environment/RemoteStreamEnvironment.java | 3 +--
.../api/environment/StreamContextEnvironment.java | 7 +------
.../api/environment/StreamExecutionEnvironment.java | 18 +++++++++++++++++-
.../api/environment/StreamPlanEnvironment.java | 5 +----
.../flink/streaming/util/TestStreamEnvironment.java | 3 +--
9 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
index ce9bde2..a9f0814 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
@@ -183,7 +184,7 @@ public class Elasticsearch6UpsertTableSinkFactoryTest extends ElasticsearchUpser
private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
@Override
- public JobExecutionResult execute(String jobName) {
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
throw new UnsupportedOperationException();
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index c334852..fb920fa 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -237,7 +238,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
}
@Override
- public JobExecutionResult execute(String jobName) {
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
throw new UnsupportedOperationException();
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 6b05f19..76e6f9d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
@@ -214,7 +215,7 @@ public class DataGenerators {
private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
@Override
- public JobExecutionResult execute(String jobName) throws Exception {
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
return null;
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5fce32d..b71006b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -79,15 +79,10 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
* Executes the JobGraph of the on a mini cluster of ClusterUtil with a user
* specified name.
*
- * @param jobName
- * name of the job
* @return The result of the job execution, containing elapsed time and accumulators.
*/
@Override
- public JobExecutionResult execute(String jobName) throws Exception {
- // transform the streaming program into a JobGraph
- StreamGraph streamGraph = getStreamGraph(jobName);
-
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 324478e..d8dca00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -302,8 +302,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
- StreamGraph streamGraph = getStreamGraph(jobName);
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
return executeRemotely(streamGraph, jarFiles);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index f523117..bc7ce53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.DetachedEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,11 +46,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute(String jobName) throws Exception {
- Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
-
- StreamGraph streamGraph = this.getStreamGraph(jobName);
-
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
// execute the programs
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2bfc8a2..47a8032 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1502,7 +1502,23 @@ public abstract class StreamExecutionEnvironment {
* @return The result of the job execution, containing elapsed time and accumulators.
* @throws Exception which occurs during job execution.
*/
- public abstract JobExecutionResult execute(String jobName) throws Exception;
+ public JobExecutionResult execute(String jobName) throws Exception {
+ Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
+
+ return execute(getStreamGraph(jobName));
+ }
+
+ /**
+ * Triggers the program execution. The environment will execute all parts of
+ * the program that have resulted in a "sink" operation. Sink operations are
+ * for example printing results or forwarding them to a message queue.
+ *
+ * @param streamGraph the stream graph representing the transformations
+ * @return The result of the job execution, containing elapsed time and accumulators.
+ * @throws Exception which occurs during job execution.
+ */
+ @Internal
+ public abstract JobExecutionResult execute(StreamGraph streamGraph) throws Exception;
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 28a8a0a..0150d2b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -54,10 +54,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute(String jobName) throws Exception {
-
- StreamGraph streamGraph = getStreamGraph(jobName);
-
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
transformations.clear();
if (env instanceof OptimizerPlanEnvironment) {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 79365c8..ef345e0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -65,8 +65,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
}
@Override
- public JobExecutionResult execute(String jobName) throws Exception {
- final StreamGraph streamGraph = getStreamGraph(jobName);
+ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
final JobGraph jobGraph = streamGraph.getJobGraph();
for (Path jarFile : jarFiles) {