You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/05/20 12:28:23 UTC
[2/2] flink git commit: [FLINK-2026][api] add a flag to indicate
previous executions
[FLINK-2026][api] add a flag to indicate previous executions
This closes #686.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/825cea2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/825cea2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/825cea2d
Branch: refs/heads/master
Commit: 825cea2df10cedc572ecadd7c03825e1e9f85ccc
Parents: f253655
Author: Maximilian Michels <mx...@apache.org>
Authored: Mon May 18 12:43:21 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Wed May 20 12:24:28 2015 +0200
----------------------------------------------------------------------
.../apache/flink/api/java/ExecutionEnvironment.java | 14 +++++++++++++-
1 file changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/825cea2d/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 2e7e57c..75d4387 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -115,6 +115,9 @@ public abstract class ExecutionEnvironment {
private ExecutionConfig config = new ExecutionConfig();
+ /** Flag to indicate whether sinks have been cleared in previous executions */
+ private boolean wasExecuted = false;
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
@@ -914,7 +917,15 @@ public abstract class ExecutionEnvironment {
*/
public JavaPlan createProgramPlan(String jobName, boolean clearSinks) {
if (this.sinks.isEmpty()) {
- throw new RuntimeException("No data sinks have been created yet. A program needs at least one sink that consumes data. Examples are writing the data set or printing it.");
+ if (wasExecuted) {
+ throw new RuntimeException("No new data sinks have been defined since the " +
+ "last execution. The last execution refers to the latest call to " +
+ "'execute()', 'count()', 'collect()', or 'print()'.");
+ } else {
+ throw new RuntimeException("No data sinks have been created yet. " +
+ "A program needs at least one sink that consumes data. " +
+ "Examples are writing the data set or printing it.");
+ }
}
if (jobName == null) {
@@ -962,6 +973,7 @@ public abstract class ExecutionEnvironment {
// clear all the sinks such that the next execution does not redo everything
if (clearSinks) {
this.sinks.clear();
+ wasExecuted = true;
}
// All types are registered now. Print information.