You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/10 16:36:40 UTC

flink git commit: [streaming] Allow force-enabling checkpoints for iterative jobs

Repository: flink
Updated Branches:
  refs/heads/master a8e085078 -> 4fe2e18b7


[streaming] Allow force-enabling checkpoints for iterative jobs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fe2e18b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fe2e18b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fe2e18b

Branch: refs/heads/master
Commit: 4fe2e18b7df37ca25f71a274d94b7c14a540f698
Parents: a8e0850
Author: Gyula Fora <gy...@apache.org>
Authored: Wed Jun 10 15:33:58 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Jun 10 15:35:28 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 14 +++++++--
 .../environment/StreamExecutionEnvironment.java | 31 ++++++++++++++++++++
 .../flink/streaming/api/graph/StreamGraph.java  | 11 +++++--
 .../api/graph/StreamingJobGraphGenerator.java   |  1 -
 .../runtime/tasks/StreamIterationHead.java      |  2 ++
 .../runtime/tasks/StreamIterationTail.java      |  3 ++
 .../streaming/runtime/tasks/StreamTask.java     |  2 +-
 .../apache/flink/streaming/api/IterateTest.java | 16 ++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 19 ++++++++++++
 9 files changed, 93 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index de6340e..0a7a486 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1188,7 +1188,15 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 Stateful computation
 ------------
 
-Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is sotred in the state. For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usacase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a source for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.
+Flink supports the checkpointing and persistence of user defined state, so in case of a failure this state can be restored to the latest checkpoint and the processing can continue from there. This gives exactly once semantics for anything that is stored in the state when the sources are stateful as well and checkpoint their current offset. The `PersistentKafkaSource` provides this stateful functionality for example. 
+
+For example when implementing a rolling count over the stream Flink gives you the possibility to safely store the counter. Another common usecase is when reading from a Kafka source to save the latest committed offset to catch up from. To mark a function for checkpointing it has to implement the `flink.streaming.api.checkpoint.Checkpointed` interface or preferably its special case where the checkpointing can be done asynchronously, `CheckpointedAsynchronously`. 
+
+Checkpointing can be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
+
+By default state checkpoints will be stored in-memory at the JobManager. Flink also supports storing the checkpoints on any flink-supported file system (such as HDFS or Tachyon) which can be set in the flink-conf.yaml. 
+
+For example let us write a reduce function that besides summing the data it also counts have many elements it has seen.
 
 {% highlight java %}
 public class CounterSum implements ReduceFunction<Long>, CheckpointedAsynchronously<Long> {
@@ -1257,7 +1265,9 @@ public static class CounterSource implements SourceFunction<Long>, CheckpointedA
 }
 {% endhighlight %}
 
-Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface. 
+Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
+
+Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index f009871..b43e123 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -241,6 +241,37 @@ public abstract class StreamExecutionEnvironment {
 		streamGraph.setCheckpointingInterval(interval);
 		return this;
 	}
+	
+	/**
+	 * Method for force-enabling fault-tolerance. Activates monitoring and
+	 * backup of streaming operator states even for jobs containing iterations.
+	 * 
+	 * Please note that the checkpoint/restore guarantees for iterative jobs are
+	 * only best-effort at the moment. Records inside the loops may be lost
+	 * during failure.
+	 * <p/>
+	 * <p/>
+	 * Setting this option assumes that the job is used in production and thus
+	 * if not stated explicitly otherwise with calling with the
+	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
+	 * in case of failure the job will be resubmitted to the cluster
+	 * indefinitely.
+	 * 
+	 * @param interval
+	 *            Time interval between state checkpoints in millis
+	 * @param force
+	 *            If true checkpointing will be enabled for iterative jobs as
+	 *            well
+	 */
+	@Deprecated
+	public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) {
+		streamGraph.setCheckpointingEnabled(true);
+		streamGraph.setCheckpointingInterval(interval);
+		if (force) {
+			streamGraph.forceCheckpoint();
+		}
+		return this;
+	}
 
 	/**
 	 * Method for enabling fault-tolerance. Activates monitoring and backup of

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index d559ed3..8ef4ca0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -79,6 +79,7 @@ public class StreamGraph extends StreamingPlan {
 	private Map<Integer, StreamLoop> streamLoops;
 	protected Map<Integer, StreamLoop> vertexIDtoLoop;
 	private StateHandleProvider<?> stateHandleProvider;
+	private boolean forceCheckpoint = false;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
 
@@ -118,6 +119,10 @@ public class StreamGraph extends StreamingPlan {
 	public void setCheckpointingInterval(long checkpointingInterval) {
 		this.checkpointingInterval = checkpointingInterval;
 	}
+	
+	public void forceCheckpoint() {
+		this.forceCheckpoint = true;	
+	}
 
 	public void setStateHandleProvider(StateHandleProvider<?> provider) {
 		this.stateHandleProvider = provider;
@@ -408,9 +413,11 @@ public class StreamGraph extends StreamingPlan {
 	public JobGraph getJobGraph(String jobGraphName) {
 
 		// temporarily forbid checkpointing for iterative jobs
-		if (isIterative() && isCheckpointingEnabled()) {
+		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
 			throw new UnsupportedOperationException(
-					"Checkpointing is currently not supported for iterative jobs!");
+					"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+					+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
+					+ "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)");
 		}
 
 		setJobName(jobGraphName);

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 9e12a68..6a6e899 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -398,7 +398,6 @@ public class StreamingJobGraphGenerator {
 
 			JobSnapshottingSettings settings = new JobSnapshottingSettings(
 					triggerVertices, ackVertices, commitVertices, interval);
-			
 			jobGraph.setSnapshotSettings(settings);
 
 			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 343f495..e69f533 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -65,6 +65,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 	@SuppressWarnings("unchecked")
 	@Override
 	public void invoke() throws Exception {
+		isRunning = true;
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Iteration source {} invoked", getName());
 		}
@@ -96,6 +97,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 		}
 		finally {
 			// Cleanup
+			isRunning = false;
 			outputHandler.flushOutputs();
 			clearBuffers();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index 9d36516..5bbae06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -57,6 +57,8 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 
 	@Override
 	public void invoke() throws Exception {
+		isRunning = true;
+		
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Iteration sink {} invoked", getName());
 		}
@@ -74,6 +76,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 		}
 		finally {
 			// Cleanup
+			isRunning = false;
 			clearBuffers();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bb641d9..b55272c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -229,7 +229,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	@Override
 	public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
-
+		
 		synchronized (checkpointLock) {
 			if (isRunning) {
 				try {

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index ebc4c93..9c6ff5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -207,6 +207,22 @@ public class IterateTest {
 		} catch (UnsupportedOperationException e) {
 			// expected behaviour
 		}
+		
+		
+		// Test force checkpointing
+
+		try {
+			env.enableCheckpointing(1, false);
+			env.execute();
+
+			// this statement should never be reached
+			fail();
+		} catch (UnsupportedOperationException e) {
+			// expected behaviour
+		}
+		
+		env.enableCheckpointing(1, true);
+		env.getStreamGraph().getJobGraph();
 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fe2e18b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7371c91..c5030ed 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -118,12 +118,31 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   /**
    * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
    * operator states. Time interval between state checkpoints is specified in in millis.
+   * 
+   * If the force flag is set to true, checkpointing will be enabled for iterative jobs as
+   * well.Please note that the checkpoint/restore guarantees for iterative jobs are
+   * only best-effort at the moment. Records inside the loops may be lost during failure.
    *
    * Setting this option assumes that the job is used in production and thus if not stated
    * explicitly otherwise with calling with the
    * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
    * failure the job will be resubmitted to the cluster indefinitely.
    */
+  @deprecated
+  def enableCheckpointing(interval : Long, force: Boolean) : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing(interval, force)
+    this
+  }
+  
+   /**
+   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
+   * operator states. Time interval between state checkpoints is specified in in millis.
+   * 
+   * Setting this option assumes that the job is used in production and thus if not stated
+   * explicitly otherwise with calling with the
+   * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method in case of
+   * failure the job will be resubmitted to the cluster indefinitely.
+   */
   def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
     javaEnv.enableCheckpointing(interval)
     this