You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2019/07/08 07:23:26 UTC

[flink] branch master updated: [FLINK-12961][datastream] Add internal StreamExecutionEnvironment.execute(StreamGraph)

This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1542dd  [FLINK-12961][datastream] Add internal StreamExecutionEnvironment.execute(StreamGraph)
f1542dd is described below

commit f1542dd37a3fd985aa9a688f4951942428b460be
Author: ifndef-SleePy <mm...@gmail.com>
AuthorDate: Tue Jun 25 11:54:27 2019 +0800

    [FLINK-12961][datastream] Add internal StreamExecutionEnvironment.execute(StreamGraph)
---
 .../Elasticsearch6UpsertTableSinkFactoryTest.java      |  3 ++-
 .../kafka/KafkaTableSourceSinkFactoryTestBase.java     |  3 ++-
 .../connectors/kafka/testutils/DataGenerators.java     |  3 ++-
 .../api/environment/LocalStreamEnvironment.java        |  7 +------
 .../api/environment/RemoteStreamEnvironment.java       |  3 +--
 .../api/environment/StreamContextEnvironment.java      |  7 +------
 .../api/environment/StreamExecutionEnvironment.java    | 18 +++++++++++++++++-
 .../api/environment/StreamPlanEnvironment.java         |  5 +----
 .../flink/streaming/util/TestStreamEnvironment.java    |  3 +--
 9 files changed, 28 insertions(+), 24 deletions(-)

diff --git a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
index ce9bde2..a9f0814 100644
--- a/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
+++ b/flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6UpsertTableSinkFactoryTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
 import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkBase;
@@ -183,7 +184,7 @@ public class Elasticsearch6UpsertTableSinkFactoryTest extends ElasticsearchUpser
 	private static class StreamExecutionEnvironmentMock extends StreamExecutionEnvironment {
 
 		@Override
-		public JobExecutionResult execute(String jobName) {
+		public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
index c334852..fb920fa 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
 import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
@@ -237,7 +238,7 @@ public abstract class KafkaTableSourceSinkFactoryTestBase extends TestLogger {
 		}
 
 		@Override
-		public JobExecutionResult execute(String jobName) {
+		public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 			throw new UnsupportedOperationException();
 		}
 	}
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
index 6b05f19..76e6f9d 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
 import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
@@ -214,7 +215,7 @@ public class DataGenerators {
 		private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
 
 			@Override
-			public JobExecutionResult execute(String jobName) throws Exception {
+			public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 				return null;
 			}
 		}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
index 5fce32d..b71006b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -79,15 +79,10 @@ public class LocalStreamEnvironment extends StreamExecutionEnvironment {
 	 * Executes the JobGraph of the on a mini cluster of ClusterUtil with a user
 	 * specified name.
 	 *
-	 * @param jobName
-	 *            name of the job
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 */
 	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		// transform the streaming program into a JobGraph
-		StreamGraph streamGraph = getStreamGraph(jobName);
-
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		JobGraph jobGraph = streamGraph.getJobGraph();
 		jobGraph.setAllowQueuedScheduling(true);
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
index 324478e..d8dca00 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -302,8 +302,7 @@ public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
-		StreamGraph streamGraph = getStreamGraph(jobName);
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 		return executeRemotely(streamGraph, jarFiles);
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index f523117..bc7ce53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.DetachedEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
-import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,11 +46,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
-
-		StreamGraph streamGraph = this.getStreamGraph(jobName);
-
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 
 		// execute the programs
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 2bfc8a2..47a8032 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1502,7 +1502,23 @@ public abstract class StreamExecutionEnvironment {
 	 * @return The result of the job execution, containing elapsed time and accumulators.
 	 * @throws Exception which occurs during job execution.
 	 */
-	public abstract JobExecutionResult execute(String jobName) throws Exception;
+	public JobExecutionResult execute(String jobName) throws Exception {
+		Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
+
+		return execute(getStreamGraph(jobName));
+	}
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 *
+	 * @param streamGraph the stream graph representing the transformations
+	 * @return The result of the job execution, containing elapsed time and accumulators.
+	 * @throws Exception which occurs during job execution.
+	 */
+	@Internal
+	public abstract JobExecutionResult execute(StreamGraph streamGraph) throws Exception;
 
 	/**
 	 * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
index 28a8a0a..0150d2b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -54,10 +54,7 @@ public class StreamPlanEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-
-		StreamGraph streamGraph = getStreamGraph(jobName);
-
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 
 		if (env instanceof OptimizerPlanEnvironment) {
diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index 79365c8..ef345e0 100644
--- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -65,8 +65,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 	}
 
 	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		final StreamGraph streamGraph = getStreamGraph(jobName);
+	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		final JobGraph jobGraph = streamGraph.getJobGraph();
 
 		for (Path jarFile : jarFiles) {