You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 18:43:56 UTC

[1/2] flink git commit: [FLINK-2407] [streaming] Add an API switch to choose between "exactly once" and "at least once".

Repository: flink
Updated Branches:
  refs/heads/master 7bd57d789 -> b211a6211


[FLINK-2407] [streaming] Add an API switch to choose between "exactly once" and "at least once".


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b211a621
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b211a621
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b211a621

Branch: refs/heads/master
Commit: b211a62111aa3c558586874d0ec5b168e6bb31f1
Parents: 833862a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 14:12:42 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 18:43:14 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/CheckpointingMode.java  |  75 +++++++++++
 .../environment/StreamExecutionEnvironment.java | 125 +++++++++++++------
 .../flink/streaming/api/graph/StreamConfig.java |  24 +++-
 .../flink/streaming/api/graph/StreamGraph.java  |  11 ++
 .../api/graph/StreamingJobGraphGenerator.java   |  12 +-
 .../runtime/io/StreamInputProcessor.java        |  13 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  14 ++-
 .../runtime/tasks/OneInputStreamTask.java       |   1 +
 .../runtime/tasks/TwoInputStreamTask.java       |   1 +
 .../apache/flink/streaming/api/IterateTest.java |   6 +-
 .../flink/streaming/graph/TranslationTest.java  |  74 +++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  |  90 +++++++++----
 12 files changed, 375 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
new file mode 100644
index 0000000..db46d00
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The checkpointing mode defines what consistency guarantees the system gives in the presence of
+ * failures.
+ * 
+ * <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
+ * processing are repeated. For stateful operations and functions, the checkpointing mode defines
+ * whether the system draws checkpoints such that a recovery behaves as if the operators/functions
+ * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
+ * in a simpler fashion that typically encounteres some duplicates upon recovery
+ * ({@link #AT_LEAST_ONCE})</p> 
+ */
+public enum CheckpointingMode {
+
+	/**
+	 * Sets the checkpointing mode to "exactly once". This mode means that the system will
+	 * checkpoint the operator and user function state in such a way that, upon recovery,
+	 * every record will be reflected exactly once in the operator state.
+	 * 
+	 * <p>For example, if a user function counts the number of elements in a stream, 
+	 * this number will consistently be equal to the number of actual elements in the stream,
+	 * regardless of failures and recovery.</p>
+	 * 
+	 * <p>Note that this does not mean that each record flows through the streaming data flow
+	 * only once. It means that upon recovery, the state of operators/functions is restored such
+	 * that the resumed data streams pick up exactly at after the last modification to the state.</p> 
+	 *  
+	 * <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
+	 * external systems (only state in Flink's operators and user functions). The reason for that
+	 * is that a certain level of "collaboration" is required between two systems to achieve
+	 * exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
+	 * this collaboration.</p>
+	 * 
+	 * <p>This mode sustains high throughput. Depending on the data flow graph and operations,
+	 * this mode may increase the record latency, because operators need to align their input
+	 * streams, in order to create a consistent snapshot point. The latency increase for simple
+	 * dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
+	 * latency remains small, but the slowest records typically have an increased latency.</p>
+	 */
+	EXACTLY_ONCE,
+
+	/**
+	 * Sets the checkpointing mode to "at least once". This mode means that the system will
+	 * checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
+	 * some records may be reflected multiple times in the operator state.
+	 * 
+	 * <p>For example, if a user function counts the number of elements in a stream, 
+	 * this number will equal to, or larger, than the actual number of elements in the stream,
+	 * in the presence of failure and recovery.</p>
+	 * 
+	 * <p>This mode has minimal impact on latency and may be preferable in very-low latency
+	 * scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
+	 * and where occasional duplicate messages (on recovery) do not matter.</p>
+	 */
+	AT_LEAST_ONCE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 58348e3..fba4e28 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -45,6 +45,7 @@ import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.FileStateHandle;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
@@ -224,51 +225,94 @@ public abstract class StreamExecutionEnvironment {
 		return this;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Checkpointing Settings
+	// ------------------------------------------------------------------------
+	
 	/**
-	 * Method for enabling fault-tolerance. Activates monitoring and backup of
-	 * streaming operator states.
-	 * <p/>
-	 * <p/>
-	 * Setting this option assumes that the job is used in production and thus
-	 * if not stated explicitly otherwise with calling with the
-	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
-	 * in case of failure the job will be resubmitted to the cluster
-	 * indefinitely.
+	 * Enables checkpointing for the streaming job. The distributed state of the streaming
+	 * dataflow will be periodically snapshotted. In case of a failure, the streaming
+	 * dataflow will be restarted from the latest completed checkpoint. This method selects
+	 * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+	 * 
+	 * <p>The job draws checkpoints periodically, in the given interval. The state will be
+	 * stored in the configured state backend.</p>
+	 * 
+	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+	 * the moment. For that reason, iterative jobs will not be started if used
+	 * with enabled checkpointing. To override this mechanism, use the 
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
 	 *
-	 * @param interval
-	 * 		Time interval between state checkpoints in millis
+	 * @param interval Time interval between state checkpoints in milliseconds.
 	 */
 	public StreamExecutionEnvironment enableCheckpointing(long interval) {
+		return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
+	}
+
+	/**
+	 * Enables checkpointing for the streaming job. The distributed state of the streaming
+	 * dataflow will be periodically snapshotted. In case of a failure, the streaming
+	 * dataflow will be restarted from the latest completed checkpoint.
+	 *
+	 * <p>The job draws checkpoints periodically, in the given interval. The system uses the
+	 * given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
+	 * The state will be stored in the configured state backend.</p>
+	 *
+	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+	 * the moment. For that reason, iterative jobs will not be started if used
+	 * with enabled checkpointing. To override this mechanism, use the 
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
+	 *
+	 * @param interval 
+	 *             Time interval between state checkpoints in milliseconds.
+	 * @param mode 
+	 *             The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
+	 */
+	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
+		if (mode == null) {
+			throw new NullPointerException("checkpoint mode must not be null");
+		}
+		if (interval <= 0) {
+			throw new IllegalArgumentException("the checkpoint interval must be positive");
+		}
+		
 		streamGraph.setCheckpointingEnabled(true);
 		streamGraph.setCheckpointingInterval(interval);
+		streamGraph.setCheckpointingMode(mode);
 		return this;
 	}
 	
 	/**
-	 * Method for force-enabling fault-tolerance. Activates monitoring and
-	 * backup of streaming operator states even for jobs containing iterations.
-	 * 
-	 * Please note that the checkpoint/restore guarantees for iterative jobs are
-	 * only best-effort at the moment. Records inside the loops may be lost
-	 * during failure.
-	 * <p/>
-	 * <p/>
-	 * Setting this option assumes that the job is used in production and thus
-	 * if not stated explicitly otherwise with calling with the
-	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
-	 * in case of failure the job will be resubmitted to the cluster
-	 * indefinitely.
+	 * Enables checkpointing for the streaming job. The distributed state of the streaming
+	 * dataflow will be periodically snapshotted. In case of a failure, the streaming
+	 * dataflow will be restarted from the latest completed checkpoint.
+	 *
+	 * <p>The job draws checkpoints periodically, in the given interval. The state will be
+	 * stored in the configured state backend.</p>
+	 *
+	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+	 * the moment. If the "force" parameter is set to true, the system will execute the
+	 * job nonetheless.</p>
 	 * 
 	 * @param interval
-	 *            Time interval between state checkpoints in millis
+	 *            Time interval between state checkpoints in millis.
+	 * @param mode
+	 *            The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
 	 * @param force
-	 *            If true checkpointing will be enabled for iterative jobs as
-	 *            well
+	 *            If true checkpointing will be enabled for iterative jobs as well.
 	 */
 	@Deprecated
-	public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) {
+	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
+		if (mode == null) {
+			throw new NullPointerException("checkpoint mode must not be null");
+		}
+		if (interval <= 0) {
+			throw new IllegalArgumentException("the checkpoint interval must be positive");
+		}
+		
 		streamGraph.setCheckpointingEnabled(true);
 		streamGraph.setCheckpointingInterval(interval);
+		streamGraph.setCheckpointingMode(mode);
 		if (force) {
 			streamGraph.forceCheckpoint();
 		}
@@ -276,18 +320,22 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
-	 * Method for enabling fault-tolerance. Activates monitoring and backup of
-	 * streaming operator states.
-	 * <p/>
-	 * <p/>
-	 * Setting this option assumes that the job is used in production and thus
-	 * if not stated explicitly otherwise with calling with the
-	 * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
-	 * in case of failure the job will be resubmitted to the cluster
-	 * indefinitely.
+	 * Enables checkpointing for the streaming job. The distributed state of the streaming
+	 * dataflow will be periodically snapshotted. In case of a failure, the streaming
+	 * dataflow will be restarted from the latest completed checkpoint. This method selects
+	 * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+	 *
+	 * <p>The job draws checkpoints periodically, in the default interval. The state will be
+	 * stored in the configured state backend.</p>
+	 *
+	 * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+	 * the moment. For that reason, iterative jobs will not be started if used
+	 * with enabled checkpointing. To override this mechanism, use the 
+	 * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
 	 */
 	public StreamExecutionEnvironment enableCheckpointing() {
 		streamGraph.setCheckpointingEnabled(true);
+		streamGraph.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
 		return this;
 	}
 
@@ -323,8 +371,7 @@ public abstract class StreamExecutionEnvironment {
 	 * A value of {@code -1} indicates that the system default value (as defined
 	 * in the configuration) should be used.
 	 *
-	 * @return The number of times the system will try to re-execute failed
-	 * tasks.
+	 * @return The number of times the system will try to re-execute failed tasks.
 	 */
 	public int getNumberOfExecutionRetries() {
 		return config.getNumberOfExecutionRetries();

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 1562f38..62735af 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,13 +68,17 @@ public class StreamConfig implements Serializable {
 	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
 	private static final String STATE_PARTITIONER = "statePartitioner";
-
+	private static final String CHECKPOINT_MODE = "checkpointMode";
+	
+	
 	// ------------------------------------------------------------------------
 	//  Default Values
 	// ------------------------------------------------------------------------
 	
 	private static final long DEFAULT_TIMEOUT = 100;
-
+	private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
+	
+	
 	// ------------------------------------------------------------------------
 	//  Config
 	// ------------------------------------------------------------------------
@@ -351,6 +355,8 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
+	// --------------------- checkpointing -----------------------
+	
 	public void setCheckpointingEnabled(boolean enabled) {
 		config.setBoolean(CHECKPOINTING_ENABLED, enabled);
 	}
@@ -358,6 +364,20 @@ public class StreamConfig implements Serializable {
 	public boolean isCheckpointingEnabled() {
 		return config.getBoolean(CHECKPOINTING_ENABLED, false);
 	}
+	
+	public void setCheckpointMode(CheckpointingMode mode) {
+		config.setInteger(CHECKPOINT_MODE, mode.ordinal());
+	}
+
+	public CheckpointingMode getCheckpointMode() {
+		int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
+		if (ordinal >= 0) {
+			return CheckpointingMode.values()[ordinal];
+		} else {
+			return DEFAULT_CHECKPOINTING_MODE; 
+		}
+	}
+	
 
 	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
 		try {

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4de5224..65736f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -147,6 +147,8 @@ public class StreamGraph extends StreamingPlan {
 		return checkpointingInterval;
 	}
 
+	// Checkpointing
+	
 	public boolean isChainingEnabled() {
 		return chaining;
 	}
@@ -155,6 +157,15 @@ public class StreamGraph extends StreamingPlan {
 		return checkpointingEnabled;
 	}
 
+	public CheckpointingMode getCheckpointingMode() {
+		return checkpointingMode;
+	}
+
+	public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
+		this.checkpointingMode = checkpointingMode;
+	}
+	
+
 	public boolean isIterative() {
 		return !streamLoops.isEmpty();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5551bf3..5280fb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -269,10 +270,19 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
+
 		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-		config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+		if (streamGraph.isCheckpointingEnabled()) {
+			config.setCheckpointMode(streamGraph.getCheckpointingMode());
+			config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+		} else {
+			// the at least once input handler is slightly cheaper (in the absence of checkpoints),
+			// so we use that one if checkpointing is not enabled
+			config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
+		}
 		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
 
+		
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 
 		if (vertexClass.equals(StreamIterationHead.class)

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4d60375..f7d7fb0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -79,12 +80,22 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
 								EventListener<CheckpointBarrier> checkpointListener,
+								CheckpointingMode checkpointMode,
 								IOManager ioManager,
 								boolean enableWatermarkMultiplexing) throws IOException {
 		
 		super(InputGateUtil.createInputGate(inputGates));
 
-		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		}
+		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
+			this.barrierHandler = new BarrierTracker(inputGate);
+		}
+		else {
+			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
+		}
+		
 		if (checkpointListener != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 9668c7f..ae97974 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -90,16 +91,25 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
 			EventListener<CheckpointBarrier> checkpointListener,
+			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {
 		
 		super(InputGateUtil.createInputGate(inputGates1, inputGates2));
 
-		this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
+			this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+		}
+		else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
+			this.barrierHandler = new BarrierTracker(inputGate);
+		}
+		else {
+			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
+		}
+		
 		if (checkpointListener != null) {
 			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
 		}
-
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d078320..605b8f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -44,6 +44,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 				InputGate[] inputGates = getEnvironment().getAllInputGates();
 				inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
 						getCheckpointBarrierListener(), 
+						configuration.getCheckpointMode(),
 						getEnvironment().getIOManager(),
 						getExecutionConfig().areTimestampsEnabled());
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index b4667b2..99c053b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -69,6 +69,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 			this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
 					inputDeserializer1, inputDeserializer2,
 					getCheckpointBarrierListener(),
+					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
 					getExecutionConfig().areTimestampsEnabled());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 2a88a32..0fad3dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -351,6 +350,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			coIt.groupBy(1, 2);
 			fail();
 		} catch (UnsupportedOperationException e) {
+			// this is expected
 		}
 
 		DataStream<String> head = coIt
@@ -479,7 +479,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 		// Test force checkpointing
 
 		try {
-			env.enableCheckpointing(1, false);
+			env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
 			env.execute();
 
 			// this statement should never be reached
@@ -488,7 +488,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
 			// expected behaviour
 		}
 
-		env.enableCheckpointing(1, true);
+		env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
 		env.getStreamGraph().getJobGraph();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
new file mode 100644
index 0000000..89679ea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class TranslationTest {
+	
+	@Test
+	public void testCheckpointModeTranslation() {
+		try {
+			// with deactivated fault tolerance, the checkpoint mode should be at-least-once
+			StreamExecutionEnvironment deactivated = getSimpleJob();
+			
+			for (JobVertex vertex : deactivated.getStreamGraph().getJobGraph().getVertices()) {
+				assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+			}
+
+			// with activated fault tolerance, the checkpoint mode should be by default exactly once
+			StreamExecutionEnvironment activated = getSimpleJob();
+			activated.enableCheckpointing(1000L);
+			for (JobVertex vertex : activated.getStreamGraph().getJobGraph().getVertices()) {
+				assertEquals(CheckpointingMode.EXACTLY_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+			}
+
+			// explicitly setting the mode
+			StreamExecutionEnvironment explicit = getSimpleJob();
+			explicit.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
+			for (JobVertex vertex : explicit.getStreamGraph().getJobGraph().getVertices()) {
+				assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	private static StreamExecutionEnvironment getSimpleJob() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.generateSequence(1, 10000000)
+				.addSink(new SinkFunction<Long>() {
+					@Override
+					public void invoke(Long value) {}
+				});
+		
+		return env;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 70e652f..e983451 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.api.scala.ClosureCleaner
 import org.apache.flink.runtime.state.StateHandleProvider
+import org.apache.flink.streaming.api.CheckpointingMode
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
 import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
 import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -112,37 +113,80 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
     this
   }
 
+  // ------------------------------------------------------------------------
+  //  Checkpointing Settings
+  // ------------------------------------------------------------------------
   /**
-   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
-   * operator states. Time interval between state checkpoints is specified in in millis.
-   * 
-   * If the force flag is set to true, checkpointing will be enabled for iterative jobs as
-   * well.Please note that the checkpoint/restore guarantees for iterative jobs are
-   * only best-effort at the moment. Records inside the loops may be lost during failure.
+   * Enables checkpointing for the streaming job. The distributed state of the streaming
+   * dataflow will be periodically snapshotted. In case of a failure, the streaming
+   * dataflow will be restarted from the latest completed checkpoint.
    *
-   * Setting this option assumes that the job is used in production and thus if not stated
-   * explicitly otherwise with calling with the
-   * [[setNumberOfExecutionRetries(int)]] method in case of
-   * failure the job will be resubmitted to the cluster indefinitely.
+   * The job draws checkpoints periodically, in the given interval. The state will be
+   * stored in the configured state backend.
+   *
+   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+   * the moment. If the "force" parameter is set to true, the system will execute the
+   * job nonetheless.
+   *
+   * @param interval
+   *     Time interval between state checkpoints in millis.
+   * @param mode
+   *     The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
+   * @param force
+   *           If true checkpointing will be enabled for iterative jobs as well.
    */
   @deprecated
-  def enableCheckpointing(interval : Long, force: Boolean) : StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing(interval, force)
+  def enableCheckpointing(interval : Long,
+                          mode: CheckpointingMode,
+                          force: Boolean) : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing(interval, mode, force)
     this
   }
-  
-   /**
-   * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
-   * operator states. Time interval between state checkpoints is specified in in millis.
-   * 
-   * Setting this option assumes that the job is used in production and thus if not stated
-   * explicitly otherwise with calling with the
-   * [[setNumberOfExecutionRetries(int)]] method in case of
-   * failure the job will be resubmitted to the cluster indefinitely.
+
+  /**
+   * Enables checkpointing for the streaming job. The distributed state of the streaming
+   * dataflow will be periodically snapshotted. In case of a failure, the streaming
+   * dataflow will be restarted from the latest completed checkpoint.
+   *
+   * The job draws checkpoints periodically, in the given interval. The system uses the
+   * given [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once").
+   * The state will be stored in the configured state backend.
+   *
+   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+   * the moment. For that reason, iterative jobs will not be started if used
+   * with enabled checkpointing. To override this mechanism, use the 
+   * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
+   *
+   * @param interval 
+   *     Time interval between state checkpoints in milliseconds.
+   * @param mode 
+   *     The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
+   */
+  def enableCheckpointing(interval : Long,
+                          mode: CheckpointingMode) : StreamExecutionEnvironment = {
+    javaEnv.enableCheckpointing(interval, mode)
+    this
+  }
+
+  /**
+   * Enables checkpointing for the streaming job. The distributed state of the streaming
+   * dataflow will be periodically snapshotted. In case of a failure, the streaming
+   * dataflow will be restarted from the latest completed checkpoint.
+   *
+   * The job draws checkpoints periodically, in the given interval. The program will use
+   * [[CheckpointingMode.EXACTLY_ONCE]] mode. The state will be stored in the
+   * configured state backend.
+   *
+   * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+   * the moment. For that reason, iterative jobs will not be started if used
+   * with enabled checkpointing. To override this mechanism, use the 
+   * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
+   *
+   * @param interval 
+   *           Time interval between state checkpoints in milliseconds.
    */
   def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
-    javaEnv.enableCheckpointing(interval)
-    this
+    enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
   }
 
   /**


[2/2] flink git commit: [hotfix] Code cleanups in the StreamConfig

Posted by se...@apache.org.
[hotfix] Code cleanups in the StreamConfig


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/833862a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/833862a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/833862a9

Branch: refs/heads/master
Commit: 833862a999326a8c1b236af0418c7bd3423c7097
Parents: 7bd57d7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 14:49:23 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 18:43:14 2015 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/graph/StreamConfig.java | 55 +++++++++++++-------
 .../flink/streaming/api/graph/StreamGraph.java  |  8 ++-
 .../api/graph/StreamingJobGraphGenerator.java   |  2 +-
 3 files changed, 44 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index d0e8064..1562f38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
@@ -37,6 +38,10 @@ public class StreamConfig implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
+	// ------------------------------------------------------------------------
+	//  Config Keys
+	// ------------------------------------------------------------------------
+	
 	private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
 	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
 	private static final String CHAINED_OUTPUTS = "chainedOutputs";
@@ -59,16 +64,22 @@ public class StreamConfig implements Serializable {
 	private static final String EDGES_IN_ORDER = "edgesInOrder";
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
+
+	private static final String CHECKPOINTING_ENABLED = "checkpointing";
 	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
 	private static final String STATE_PARTITIONER = "statePartitioner";
 
-	// DEFAULT VALUES
+	// ------------------------------------------------------------------------
+	//  Default Values
+	// ------------------------------------------------------------------------
+	
 	private static final long DEFAULT_TIMEOUT = 100;
-	public static final String STATE_MONITORING = "STATE_MONITORING";
 
-	// CONFIG METHODS
+	// ------------------------------------------------------------------------
+	//  Config
+	// ------------------------------------------------------------------------
 
-	private Configuration config;
+	private final Configuration config;
 
 	public StreamConfig(Configuration config) {
 		this.config = config;
@@ -78,6 +89,11 @@ public class StreamConfig implements Serializable {
 		return config;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Configured Properties
+	// ------------------------------------------------------------------------
+	
+
 	public void setVertexID(Integer vertexID) {
 		config.setInteger(VERTEX_NAME, vertexID);
 	}
@@ -335,12 +351,12 @@ public class StreamConfig implements Serializable {
 		}
 	}
 
-	public void setStateMonitoring(boolean stateMonitoring) {
-		config.setBoolean(STATE_MONITORING, stateMonitoring);
+	public void setCheckpointingEnabled(boolean enabled) {
+		config.setBoolean(CHECKPOINTING_ENABLED, enabled);
 	}
 
-	public boolean getStateMonitoring() {
-		return config.getBoolean(STATE_MONITORING, false);
+	public boolean isCheckpointingEnabled() {
+		return config.getBoolean(CHECKPOINTING_ENABLED, false);
 	}
 
 	public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
@@ -435,28 +451,29 @@ public class StreamConfig implements Serializable {
 		builder.append("\n=======================");
 		builder.append("Stream Config");
 		builder.append("=======================");
-		builder.append("\nTask name: " + getVertexID());
-		builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
-		builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
-		builder.append("\nOutput names: " + getNonChainedOutputs(cl));
+		builder.append("\nTask name: ").append(getVertexID());
+		builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
+		builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
+		builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
 		builder.append("\nPartitioning:");
 		for (StreamEdge output : getNonChainedOutputs(cl)) {
 			int outputname = output.getTargetId();
-			builder.append("\n\t" + outputname + ": " + output.getPartitioner());
+			builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
 		}
 
-		builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+		builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
 
 		try {
-			builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
-		} catch (Exception e) {
+			builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
+		}
+		catch (Exception e) {
 			builder.append("\nOperator: Missing");
 		}
-		builder.append("\nBuffer timeout: " + getBufferTimeout());
-		builder.append("\nState Monitoring: " + getStateMonitoring());
+		builder.append("\nBuffer timeout: ").append(getBufferTimeout());
+		builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
 		if (isChainStart() && getChainedOutputs(cl).size() > 0) {
 			builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
-			builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+			builder.append(getTransitiveChainedTaskConfigs(cl));
 		}
 
 		return builder.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index f1428b4..4de5224 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -42,6 +42,7 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -66,14 +67,19 @@ import org.slf4j.LoggerFactory;
  */
 public class StreamGraph extends StreamingPlan {
 
+	/** The default interval for checkpoints, in milliseconds */
+	public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
+	
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+
 	private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
 
 	private final StreamExecutionEnvironment environemnt;
 	private final ExecutionConfig executionConfig;
 
+	private CheckpointingMode checkpointingMode;
 	private boolean checkpointingEnabled = false;
-	private long checkpointingInterval = 5000;
+	private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
 	private boolean chaining = true;
 
 	private Map<Integer, StreamNode> streamNodes;

http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c988150..5551bf3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -269,7 +269,7 @@ public class StreamingJobGraphGenerator {
 		config.setNumberOfOutputs(nonChainableOutputs.size());
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
-		config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
 		config.setStateHandleProvider(streamGraph.getStateHandleProvider());
 		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());