You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/07/29 18:43:56 UTC
[1/2] flink git commit: [FLINK-2407] [streaming] Add an API switch to
choose between "exactly once" and "at least once".
Repository: flink
Updated Branches:
refs/heads/master 7bd57d789 -> b211a6211
[FLINK-2407] [streaming] Add an API switch to choose between "exactly once" and "at least once".
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b211a621
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b211a621
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b211a621
Branch: refs/heads/master
Commit: b211a62111aa3c558586874d0ec5b168e6bb31f1
Parents: 833862a
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 14:12:42 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 18:43:14 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/CheckpointingMode.java | 75 +++++++++++
.../environment/StreamExecutionEnvironment.java | 125 +++++++++++++------
.../flink/streaming/api/graph/StreamConfig.java | 24 +++-
.../flink/streaming/api/graph/StreamGraph.java | 11 ++
.../api/graph/StreamingJobGraphGenerator.java | 12 +-
.../runtime/io/StreamInputProcessor.java | 13 +-
.../runtime/io/StreamTwoInputProcessor.java | 14 ++-
.../runtime/tasks/OneInputStreamTask.java | 1 +
.../runtime/tasks/TwoInputStreamTask.java | 1 +
.../apache/flink/streaming/api/IterateTest.java | 6 +-
.../flink/streaming/graph/TranslationTest.java | 74 +++++++++++
.../api/scala/StreamExecutionEnvironment.scala | 90 +++++++++----
12 files changed, 375 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
new file mode 100644
index 0000000..db46d00
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/CheckpointingMode.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+/**
+ * The checkpointing mode defines what consistency guarantees the system gives in the presence of
+ * failures.
+ *
+ * <p>When checkpointing is activated, the data streams are replayed such that lost parts of the
+ * processing are repeated. For stateful operations and functions, the checkpointing mode defines
+ * whether the system draws checkpoints such that a recovery behaves as if the operators/functions
+ * see each record "exactly once" ({@link #EXACTLY_ONCE}), or whether the checkpoints are drawn
+ * in a simpler fashion that typically encounteres some duplicates upon recovery
+ * ({@link #AT_LEAST_ONCE})</p>
+ */
+public enum CheckpointingMode {
+
+ /**
+ * Sets the checkpointing mode to "exactly once". This mode means that the system will
+ * checkpoint the operator and user function state in such a way that, upon recovery,
+ * every record will be reflected exactly once in the operator state.
+ *
+ * <p>For example, if a user function counts the number of elements in a stream,
+ * this number will consistently be equal to the number of actual elements in the stream,
+ * regardless of failures and recovery.</p>
+ *
+ * <p>Note that this does not mean that each record flows through the streaming data flow
+ * only once. It means that upon recovery, the state of operators/functions is restored such
+ * that the resumed data streams pick up exactly at after the last modification to the state.</p>
+ *
+ * <p>Note that this mode does not guarantee exactly-once behavior in the interaction with
+ * external systems (only state in Flink's operators and user functions). The reason for that
+ * is that a certain level of "collaboration" is required between two systems to achieve
+ * exactly-once guarantees. However, for certain systems, connectors can be written that facilitate
+ * this collaboration.</p>
+ *
+ * <p>This mode sustains high throughput. Depending on the data flow graph and operations,
+ * this mode may increase the record latency, because operators need to align their input
+ * streams, in order to create a consistent snapshot point. The latency increase for simple
+ * dataflows (no repartitioning) is negligible. For simple dataflows with repartitioning, the average
+ * latency remains small, but the slowest records typically have an increased latency.</p>
+ */
+ EXACTLY_ONCE,
+
+ /**
+ * Sets the checkpointing mode to "at least once". This mode means that the system will
+ * checkpoint the operator and user function state in a simpler way. Upon failure and recovery,
+ * some records may be reflected multiple times in the operator state.
+ *
+ * <p>For example, if a user function counts the number of elements in a stream,
+ * this number will equal to, or larger, than the actual number of elements in the stream,
+ * in the presence of failure and recovery.</p>
+ *
+ * <p>This mode has minimal impact on latency and may be preferable in very-low latency
+ * scenarios, where a sustained very-low latency (such as few milliseconds) is needed,
+ * and where occasional duplicate messages (on recovery) do not matter.</p>
+ */
+ AT_LEAST_ONCE
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 58348e3..fba4e28 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -45,6 +45,7 @@ import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction;
@@ -224,51 +225,94 @@ public abstract class StreamExecutionEnvironment {
return this;
}
+ // ------------------------------------------------------------------------
+ // Checkpointing Settings
+ // ------------------------------------------------------------------------
+
/**
- * Method for enabling fault-tolerance. Activates monitoring and backup of
- * streaming operator states.
- * <p/>
- * <p/>
- * Setting this option assumes that the job is used in production and thus
- * if not stated explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
- * in case of failure the job will be resubmitted to the cluster
- * indefinitely.
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint. This method selects
+ * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+ *
+ * <p>The job draws checkpoints periodically, in the given interval. The state will be
+ * stored in the configured state backend.</p>
+ *
+ * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
*
- * @param interval
- * Time interval between state checkpoints in millis
+ * @param interval Time interval between state checkpoints in milliseconds.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
+ return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ * <p>The job draws checkpoints periodically, in the given interval. The system uses the
+ * given {@link CheckpointingMode} for the checkpointing ("exactly once" vs "at least once").
+ * The state will be stored in the configured state backend.</p>
+ *
+ * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
+ *
+ * @param interval
+ * Time interval between state checkpoints in milliseconds.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
+ */
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
+ if (mode == null) {
+ throw new NullPointerException("checkpoint mode must not be null");
+ }
+ if (interval <= 0) {
+ throw new IllegalArgumentException("the checkpoint interval must be positive");
+ }
+
streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingInterval(interval);
+ streamGraph.setCheckpointingMode(mode);
return this;
}
/**
- * Method for force-enabling fault-tolerance. Activates monitoring and
- * backup of streaming operator states even for jobs containing iterations.
- *
- * Please note that the checkpoint/restore guarantees for iterative jobs are
- * only best-effort at the moment. Records inside the loops may be lost
- * during failure.
- * <p/>
- * <p/>
- * Setting this option assumes that the job is used in production and thus
- * if not stated explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
- * in case of failure the job will be resubmitted to the cluster
- * indefinitely.
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ * <p>The job draws checkpoints periodically, in the given interval. The state will be
+ * stored in the configured state backend.</p>
+ *
+ * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. If the "force" parameter is set to true, the system will execute the
+ * job nonetheless.</p>
*
* @param interval
- * Time interval between state checkpoints in millis
+ * Time interval between state checkpoints in millis.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
* @param force
- * If true checkpointing will be enabled for iterative jobs as
- * well
+ * If true checkpointing will be enabled for iterative jobs as well.
*/
@Deprecated
- public StreamExecutionEnvironment enableCheckpointing(long interval, boolean force) {
+ public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
+ if (mode == null) {
+ throw new NullPointerException("checkpoint mode must not be null");
+ }
+ if (interval <= 0) {
+ throw new IllegalArgumentException("the checkpoint interval must be positive");
+ }
+
streamGraph.setCheckpointingEnabled(true);
streamGraph.setCheckpointingInterval(interval);
+ streamGraph.setCheckpointingMode(mode);
if (force) {
streamGraph.forceCheckpoint();
}
@@ -276,18 +320,22 @@ public abstract class StreamExecutionEnvironment {
}
/**
- * Method for enabling fault-tolerance. Activates monitoring and backup of
- * streaming operator states.
- * <p/>
- * <p/>
- * Setting this option assumes that the job is used in production and thus
- * if not stated explicitly otherwise with calling with the
- * {@link #setNumberOfExecutionRetries(int numberOfExecutionRetries)} method
- * in case of failure the job will be resubmitted to the cluster
- * indefinitely.
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint. This method selects
+ * {@link CheckpointingMode#EXACTLY_ONCE} guarantees.
+ *
+ * <p>The job draws checkpoints periodically, in the default interval. The state will be
+ * stored in the configured state backend.</p>
+ *
+ * <p>NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * {@link #enableCheckpointing(long, CheckpointingMode, boolean)} method.</p>
*/
public StreamExecutionEnvironment enableCheckpointing() {
streamGraph.setCheckpointingEnabled(true);
+ streamGraph.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
return this;
}
@@ -323,8 +371,7 @@ public abstract class StreamExecutionEnvironment {
* A value of {@code -1} indicates that the system default value (as defined
* in the configuration) should be used.
*
- * @return The number of times the system will try to re-execute failed
- * tasks.
+ * @return The number of times the system will try to re-execute failed tasks.
*/
public int getNumberOfExecutionRetries() {
return config.getNumberOfExecutionRetries();
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 1562f38..62735af 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -68,13 +68,17 @@ public class StreamConfig implements Serializable {
private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
private static final String STATE_PARTITIONER = "statePartitioner";
-
+ private static final String CHECKPOINT_MODE = "checkpointMode";
+
+
// ------------------------------------------------------------------------
// Default Values
// ------------------------------------------------------------------------
private static final long DEFAULT_TIMEOUT = 100;
-
+ private static final CheckpointingMode DEFAULT_CHECKPOINTING_MODE = CheckpointingMode.EXACTLY_ONCE;
+
+
// ------------------------------------------------------------------------
// Config
// ------------------------------------------------------------------------
@@ -351,6 +355,8 @@ public class StreamConfig implements Serializable {
}
}
+ // --------------------- checkpointing -----------------------
+
public void setCheckpointingEnabled(boolean enabled) {
config.setBoolean(CHECKPOINTING_ENABLED, enabled);
}
@@ -358,6 +364,20 @@ public class StreamConfig implements Serializable {
public boolean isCheckpointingEnabled() {
return config.getBoolean(CHECKPOINTING_ENABLED, false);
}
+
+ public void setCheckpointMode(CheckpointingMode mode) {
+ config.setInteger(CHECKPOINT_MODE, mode.ordinal());
+ }
+
+ public CheckpointingMode getCheckpointMode() {
+ int ordinal = config.getInteger(CHECKPOINT_MODE, -1);
+ if (ordinal >= 0) {
+ return CheckpointingMode.values()[ordinal];
+ } else {
+ return DEFAULT_CHECKPOINTING_MODE;
+ }
+ }
+
public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
try {
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 4de5224..65736f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -147,6 +147,8 @@ public class StreamGraph extends StreamingPlan {
return checkpointingInterval;
}
+ // Checkpointing
+
public boolean isChainingEnabled() {
return chaining;
}
@@ -155,6 +157,15 @@ public class StreamGraph extends StreamingPlan {
return checkpointingEnabled;
}
+ public CheckpointingMode getCheckpointingMode() {
+ return checkpointingMode;
+ }
+
+ public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
+ this.checkpointingMode = checkpointingMode;
+ }
+
+
public boolean isIterative() {
return !streamLoops.isEmpty();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 5551bf3..5280fb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -269,10 +270,19 @@ public class StreamingJobGraphGenerator {
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
+
config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
- config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+ if (streamGraph.isCheckpointingEnabled()) {
+ config.setCheckpointMode(streamGraph.getCheckpointingMode());
+ config.setStateHandleProvider(streamGraph.getStateHandleProvider());
+ } else {
+ // the at least once input handler is slightly cheaper (in the absence of checkpoints),
+ // so we use that one if checkpointing is not enabled
+ config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
+ }
config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
+
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
if (vertexClass.equals(StreamIterationHead.class)
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 4d60375..f7d7fb0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -79,12 +80,22 @@ public class StreamInputProcessor<IN> extends AbstractReader implements ReaderBa
@SuppressWarnings("unchecked")
public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
EventListener<CheckpointBarrier> checkpointListener,
+ CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates));
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ }
+ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
+ this.barrierHandler = new BarrierTracker(inputGate);
+ }
+ else {
+ throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
+ }
+
if (checkpointListener != null) {
this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 9668c7f..ae97974 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
@@ -90,16 +91,25 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader implements
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
EventListener<CheckpointBarrier> checkpointListener,
+ CheckpointingMode checkpointMode,
IOManager ioManager,
boolean enableWatermarkMultiplexing) throws IOException {
super(InputGateUtil.createInputGate(inputGates1, inputGates2));
- this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
+ this.barrierHandler = new BarrierBuffer(inputGate, ioManager);
+ }
+ else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
+ this.barrierHandler = new BarrierTracker(inputGate);
+ }
+ else {
+ throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
+ }
+
if (checkpointListener != null) {
this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
}
-
if (enableWatermarkMultiplexing) {
MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index d078320..605b8f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -44,6 +44,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
InputGate[] inputGates = getEnvironment().getAllInputGates();
inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
getCheckpointBarrierListener(),
+ configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
getExecutionConfig().areTimestampsEnabled());
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index b4667b2..99c053b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -69,6 +69,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
inputDeserializer1, inputDeserializer2,
getCheckpointBarrierListener(),
+ configuration.getCheckpointMode(),
getEnvironment().getIOManager(),
getExecutionConfig().areTimestampsEnabled());
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 2a88a32..0fad3dd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -27,7 +27,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
@@ -351,6 +350,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
coIt.groupBy(1, 2);
fail();
} catch (UnsupportedOperationException e) {
+ // this is expected
}
DataStream<String> head = coIt
@@ -479,7 +479,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// Test force checkpointing
try {
- env.enableCheckpointing(1, false);
+ env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, false);
env.execute();
// this statement should never be reached
@@ -488,7 +488,7 @@ public class IterateTest extends StreamingMultipleProgramsTestBase {
// expected behaviour
}
- env.enableCheckpointing(1, true);
+ env.enableCheckpointing(1, CheckpointingMode.EXACTLY_ONCE, true);
env.getStreamGraph().getJobGraph();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
new file mode 100644
index 0000000..89679ea
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/graph/TranslationTest.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+@SuppressWarnings("serial")
+public class TranslationTest {
+
+ @Test
+ public void testCheckpointModeTranslation() {
+ try {
+ // with deactivated fault tolerance, the checkpoint mode should be at-least-once
+ StreamExecutionEnvironment deactivated = getSimpleJob();
+
+ for (JobVertex vertex : deactivated.getStreamGraph().getJobGraph().getVertices()) {
+ assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+ }
+
+ // with activated fault tolerance, the checkpoint mode should be by default exactly once
+ StreamExecutionEnvironment activated = getSimpleJob();
+ activated.enableCheckpointing(1000L);
+ for (JobVertex vertex : activated.getStreamGraph().getJobGraph().getVertices()) {
+ assertEquals(CheckpointingMode.EXACTLY_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+ }
+
+ // explicitly setting the mode
+ StreamExecutionEnvironment explicit = getSimpleJob();
+ explicit.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
+ for (JobVertex vertex : explicit.getStreamGraph().getJobGraph().getVertices()) {
+ assertEquals(CheckpointingMode.AT_LEAST_ONCE, new StreamConfig(vertex.getConfiguration()).getCheckpointMode());
+ }
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static StreamExecutionEnvironment getSimpleJob() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.generateSequence(1, 10000000)
+ .addSink(new SinkFunction<Long>() {
+ @Override
+ public void invoke(Long value) {}
+ });
+
+ return env;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b211a621/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 70e652f..e983451 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.api.scala.ClosureCleaner
import org.apache.flink.runtime.state.StateHandleProvider
+import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaEnv}
import org.apache.flink.streaming.api.functions.source.FileMonitoringFunction.WatchType
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
@@ -112,37 +113,80 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
this
}
+ // ------------------------------------------------------------------------
+ // Checkpointing Settings
+ // ------------------------------------------------------------------------
/**
- * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
- * operator states. Time interval between state checkpoints is specified in in millis.
- *
- * If the force flag is set to true, checkpointing will be enabled for iterative jobs as
- * well.Please note that the checkpoint/restore guarantees for iterative jobs are
- * only best-effort at the moment. Records inside the loops may be lost during failure.
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
*
- * Setting this option assumes that the job is used in production and thus if not stated
- * explicitly otherwise with calling with the
- * [[setNumberOfExecutionRetries(int)]] method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
+ * The job draws checkpoints periodically, in the given interval. The state will be
+ * stored in the configured state backend.
+ *
+ * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. If the "force" parameter is set to true, the system will execute the
+ * job nonetheless.
+ *
+ * @param interval
+ * Time interval between state checkpoints in millis.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
+ * @param force
+ * If true checkpointing will be enabled for iterative jobs as well.
*/
@deprecated
- def enableCheckpointing(interval : Long, force: Boolean) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval, force)
+ def enableCheckpointing(interval : Long,
+ mode: CheckpointingMode,
+ force: Boolean) : StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing(interval, mode, force)
this
}
-
- /**
- * Method for enabling fault-tolerance. Activates monitoring and backup of streaming
- * operator states. Time interval between state checkpoints is specified in in millis.
- *
- * Setting this option assumes that the job is used in production and thus if not stated
- * explicitly otherwise with calling with the
- * [[setNumberOfExecutionRetries(int)]] method in case of
- * failure the job will be resubmitted to the cluster indefinitely.
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ * The job draws checkpoints periodically, in the given interval. The system uses the
+ * given [[CheckpointingMode]] for the checkpointing ("exactly once" vs "at least once").
+ * The state will be stored in the configured state backend.
+ *
+ * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
+ *
+ * @param interval
+ * Time interval between state checkpoints in milliseconds.
+ * @param mode
+ * The checkpointing mode, selecting between "exactly once" and "at least once" guarantees.
+ */
+ def enableCheckpointing(interval : Long,
+ mode: CheckpointingMode) : StreamExecutionEnvironment = {
+ javaEnv.enableCheckpointing(interval, mode)
+ this
+ }
+
+ /**
+ * Enables checkpointing for the streaming job. The distributed state of the streaming
+ * dataflow will be periodically snapshotted. In case of a failure, the streaming
+ * dataflow will be restarted from the latest completed checkpoint.
+ *
+ * The job draws checkpoints periodically, in the given interval. The program will use
+ * [[CheckpointingMode.EXACTLY_ONCE]] mode. The state will be stored in the
+ * configured state backend.
+ *
+ * NOTE: Checkpointing iterative streaming dataflows in not properly supported at
+ * the moment. For that reason, iterative jobs will not be started if used
+ * with enabled checkpointing. To override this mechanism, use the
+ * [[enableCheckpointing(long, CheckpointingMode, boolean)]] method.
+ *
+ * @param interval
+ * Time interval between state checkpoints in milliseconds.
*/
def enableCheckpointing(interval : Long) : StreamExecutionEnvironment = {
- javaEnv.enableCheckpointing(interval)
- this
+ enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE)
}
/**
[2/2] flink git commit: [hotfix] Code cleanups in the StreamConfig
Posted by se...@apache.org.
[hotfix] Code cleanups in the StreamConfig
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/833862a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/833862a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/833862a9
Branch: refs/heads/master
Commit: 833862a999326a8c1b236af0418c7bd3423c7097
Parents: 7bd57d7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Jul 29 14:49:23 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Jul 29 18:43:14 2015 +0200
----------------------------------------------------------------------
.../flink/streaming/api/graph/StreamConfig.java | 55 +++++++++++++-------
.../flink/streaming/api/graph/StreamGraph.java | 8 ++-
.../api/graph/StreamingJobGraphGenerator.java | 2 +-
3 files changed, 44 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index d0e8064..1562f38 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
@@ -37,6 +38,10 @@ public class StreamConfig implements Serializable {
private static final long serialVersionUID = 1L;
+ // ------------------------------------------------------------------------
+ // Config Keys
+ // ------------------------------------------------------------------------
+
private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
private static final String NUMBER_OF_INPUTS = "numberOfInputs";
private static final String CHAINED_OUTPUTS = "chainedOutputs";
@@ -59,16 +64,22 @@ public class StreamConfig implements Serializable {
private static final String EDGES_IN_ORDER = "edgesInOrder";
private static final String OUT_STREAM_EDGES = "outStreamEdges";
private static final String IN_STREAM_EDGES = "inStreamEdges";
+
+ private static final String CHECKPOINTING_ENABLED = "checkpointing";
private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
private static final String STATE_PARTITIONER = "statePartitioner";
- // DEFAULT VALUES
+ // ------------------------------------------------------------------------
+ // Default Values
+ // ------------------------------------------------------------------------
+
private static final long DEFAULT_TIMEOUT = 100;
- public static final String STATE_MONITORING = "STATE_MONITORING";
- // CONFIG METHODS
+ // ------------------------------------------------------------------------
+ // Config
+ // ------------------------------------------------------------------------
- private Configuration config;
+ private final Configuration config;
public StreamConfig(Configuration config) {
this.config = config;
@@ -78,6 +89,11 @@ public class StreamConfig implements Serializable {
return config;
}
+ // ------------------------------------------------------------------------
+ // Configured Properties
+ // ------------------------------------------------------------------------
+
+
public void setVertexID(Integer vertexID) {
config.setInteger(VERTEX_NAME, vertexID);
}
@@ -335,12 +351,12 @@ public class StreamConfig implements Serializable {
}
}
- public void setStateMonitoring(boolean stateMonitoring) {
- config.setBoolean(STATE_MONITORING, stateMonitoring);
+ public void setCheckpointingEnabled(boolean enabled) {
+ config.setBoolean(CHECKPOINTING_ENABLED, enabled);
}
- public boolean getStateMonitoring() {
- return config.getBoolean(STATE_MONITORING, false);
+ public boolean isCheckpointingEnabled() {
+ return config.getBoolean(CHECKPOINTING_ENABLED, false);
}
public void setOutEdgesInOrder(List<StreamEdge> outEdgeList) {
@@ -435,28 +451,29 @@ public class StreamConfig implements Serializable {
builder.append("\n=======================");
builder.append("Stream Config");
builder.append("=======================");
- builder.append("\nTask name: " + getVertexID());
- builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
- builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
- builder.append("\nOutput names: " + getNonChainedOutputs(cl));
+ builder.append("\nTask name: ").append(getVertexID());
+ builder.append("\nNumber of non-chained inputs: ").append(getNumberOfInputs());
+ builder.append("\nNumber of non-chained outputs: ").append(getNumberOfOutputs());
+ builder.append("\nOutput names: ").append(getNonChainedOutputs(cl));
builder.append("\nPartitioning:");
for (StreamEdge output : getNonChainedOutputs(cl)) {
int outputname = output.getTargetId();
- builder.append("\n\t" + outputname + ": " + output.getPartitioner());
+ builder.append("\n\t").append(outputname).append(": ").append(output.getPartitioner());
}
- builder.append("\nChained subtasks: " + getChainedOutputs(cl));
+ builder.append("\nChained subtasks: ").append(getChainedOutputs(cl));
try {
- builder.append("\nOperator: " + getStreamOperator(cl).getClass().getSimpleName());
- } catch (Exception e) {
+ builder.append("\nOperator: ").append(getStreamOperator(cl).getClass().getSimpleName());
+ }
+ catch (Exception e) {
builder.append("\nOperator: Missing");
}
- builder.append("\nBuffer timeout: " + getBufferTimeout());
- builder.append("\nState Monitoring: " + getStateMonitoring());
+ builder.append("\nBuffer timeout: ").append(getBufferTimeout());
+ builder.append("\nState Monitoring: ").append(isCheckpointingEnabled());
if (isChainStart() && getChainedOutputs(cl).size() > 0) {
builder.append("\n\n\n---------------------\nChained task configs\n---------------------\n");
- builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
+ builder.append(getTransitiveChainedTaskConfigs(cl));
}
return builder.toString();
http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index f1428b4..4de5224 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -42,6 +42,7 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -66,14 +67,19 @@ import org.slf4j.LoggerFactory;
*/
public class StreamGraph extends StreamingPlan {
+ /** The default interval for checkpoints, in milliseconds */
+ public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
+
private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
+
private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
private final StreamExecutionEnvironment environemnt;
private final ExecutionConfig executionConfig;
+ private CheckpointingMode checkpointingMode;
private boolean checkpointingEnabled = false;
- private long checkpointingInterval = 5000;
+ private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
private boolean chaining = true;
private Map<Integer, StreamNode> streamNodes;
http://git-wip-us.apache.org/repos/asf/flink/blob/833862a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c988150..5551bf3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -269,7 +269,7 @@ public class StreamingJobGraphGenerator {
config.setNumberOfOutputs(nonChainableOutputs.size());
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
- config.setStateMonitoring(streamGraph.isCheckpointingEnabled());
+ config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
config.setStateHandleProvider(streamGraph.getStateHandleProvider());
config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());