You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/20 12:28:23 UTC

[2/2] flink git commit: [FLINK-2026][api] add a flag to indicate previous executions

[FLINK-2026][api] add a flag to indicate previous executions

This closes #686.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/825cea2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/825cea2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/825cea2d

Branch: refs/heads/master
Commit: 825cea2df10cedc572ecadd7c03825e1e9f85ccc
Parents: f253655
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon May 18 12:43:21 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed May 20 12:24:28 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/api/java/ExecutionEnvironment.java   | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/825cea2d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 2e7e57c..75d4387 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -115,6 +115,9 @@ public abstract class ExecutionEnvironment {
 
 	private ExecutionConfig config = new ExecutionConfig();
 
+	/** Flag to indicate whether sinks have been cleared in previous executions */
+	private boolean wasExecuted = false;
+
 	// --------------------------------------------------------------------------------------------
 	//  Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -914,7 +917,15 @@ public abstract class ExecutionEnvironment {
 	 */
 	public JavaPlan createProgramPlan(String jobName, boolean clearSinks) {
 		if (this.sinks.isEmpty()) {
-			throw new RuntimeException("No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.");
+			if (wasExecuted) {
+				throw new RuntimeException("No new data sinks have been defined since the " +
+						"last execution. The last execution refers to the latest call to " +
+						"'execute()', 'count()', 'collect()', or 'print()'.");
+			} else {
+				throw new RuntimeException("No data sinks have been created yet. " +
+						"A program needs at least one sink that consumes data. " +
+						"Examples are writing the data set or printing it.");
+			}
 		}
 		
 		if (jobName == null) {
@@ -962,6 +973,7 @@ public abstract class ExecutionEnvironment {
 		// clear all the sinks such that the next execution does not redo everything
 		if (clearSinks) {
 			this.sinks.clear();
+			wasExecuted = true;
 		}
 
 		// All types are registered now. Print information.