You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/11/26 18:48:45 UTC
[1/2] flink git commit: [hotfix] Java-7-ify the ExecutionConfig class
Repository: flink
Updated Branches:
refs/heads/master dcce6def9 -> 55fd5f32d
[hotfix] Java-7-ify the ExecutionConfig class
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4097666e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4097666e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4097666e
Branch: refs/heads/master
Commit: 4097666ef3bf595e20fef206e07aa0251fe7eb35
Parents: dcce6de
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 18:03:55 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 26 10:51:43 2015 +0100
----------------------------------------------------------------------
.../org/apache/flink/api/common/ExecutionConfig.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4097666e/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index b031441..a0d3363 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -88,7 +88,7 @@ public class ExecutionConfig implements Serializable {
/** If set to true, progress updates are printed to System.out during execution */
private boolean printProgressDuringExecution = true;
- private GlobalJobParameters globalJobParameters = null;
+ private GlobalJobParameters globalJobParameters;
private long autoWatermarkInterval = 0;
@@ -99,17 +99,17 @@ public class ExecutionConfig implements Serializable {
// Serializers and types registered with Kryo and the PojoSerializer
// we store them in linked maps/sets to ensure they are registered in order in all kryo instances.
- private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
+ private final LinkedHashMap<Class<?>, SerializableSerializer<?>> registeredTypesWithKryoSerializers = new LinkedHashMap<>();
- private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> registeredTypesWithKryoSerializerClasses = new LinkedHashMap<>();
- private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<Class<?>, SerializableSerializer<?>>();
+ private final LinkedHashMap<Class<?>, SerializableSerializer<?>> defaultKryoSerializers = new LinkedHashMap<>();
- private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<Class<?>, Class<? extends Serializer<?>>>();
+ private final LinkedHashMap<Class<?>, Class<? extends Serializer<?>>> defaultKryoSerializerClasses = new LinkedHashMap<>();
- private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<Class<?>>();
+ private final LinkedHashSet<Class<?>> registeredKryoTypes = new LinkedHashSet<>();
- private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<Class<?>>();
+ private final LinkedHashSet<Class<?>> registeredPojoTypes = new LinkedHashSet<>();
// --------------------------------------------------------------------------------------------
[2/2] flink git commit: [FLINK-3051] [streaming] Add mechanisms to
control the maximum number of concurrent checkpoints
Posted by se...@apache.org.
[FLINK-3051] [streaming] Add mechanisms to control the maximum number of concurrent checkpoints
This closes #1408
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55fd5f32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55fd5f32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55fd5f32
Branch: refs/heads/master
Commit: 55fd5f32d7ef0292a01192ab08456fae49b91791
Parents: 4097666
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 19:05:47 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 26 17:16:29 2015 +0100
----------------------------------------------------------------------
.../checkpoint/CheckpointCoordinator.java | 402 ++++++++++++-------
.../CheckpointCoordinatorDeActivator.java | 11 +-
.../runtime/checkpoint/PendingCheckpoint.java | 10 +-
.../runtime/executiongraph/ExecutionGraph.java | 22 +-
.../jobgraph/tasks/JobSnapshottingSettings.java | 55 ++-
.../flink/runtime/jobmanager/JobManager.scala | 2 +
.../checkpoint/CheckpointCoordinatorTest.java | 357 +++++++++++++++-
.../checkpoint/CheckpointStateRestoreTest.java | 6 +-
.../checkpoint/CoordinatorShutdownTest.java | 6 +-
.../api/environment/CheckpointConfig.java | 221 ++++++++++
.../environment/StreamExecutionEnvironment.java | 61 +--
.../flink/streaming/api/graph/StreamGraph.java | 105 ++---
.../api/graph/StreamGraphGenerator.java | 11 -
.../api/graph/StreamingJobGraphGenerator.java | 36 +-
.../api/scala/StreamExecutionEnvironment.scala | 7 +
15 files changed, 990 insertions(+), 322 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 09dd2d9..454b88a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
+
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,7 +48,6 @@ import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -63,11 +64,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
*/
public class CheckpointCoordinator {
- private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+ static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
/** The number of recent checkpoints whose IDs are remembered */
private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
+
/** Coordinator-wide lock to safeguard the checkpoint updates */
private final Object lock = new Object();
@@ -83,35 +85,58 @@ public class CheckpointCoordinator {
/** Tasks who need to be sent a message when a checkpoint is confirmed */
private final ExecutionVertex[] tasksToCommitTo;
+ /** Map from checkpoint ID to the pending checkpoint */
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
- /**
- * Completed checkpoints. Implementations can be blocking. Make sure calls to methods
- * accessing this don't block the job manager actor and run asynchronously.
- */
+ /** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
+ * accessing this don't block the job manager actor and run asynchronously. */
private final CompletedCheckpointStore completedCheckpointStore;
+ /** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
private final ArrayDeque<Long> recentPendingCheckpoints;
- /**
- * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
- * need to be ascending across job managers.
- */
+ /** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
+ * need to be ascending across job managers. */
private final CheckpointIDCounter checkpointIdCounter;
- private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
+ /** Class loader used to deserialize the state handles (as they may be user-defined) */
+ private final ClassLoader userClassLoader;
- /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
- private final Timer timer;
+ /** The base checkpoint interval. Actual trigger time may be affected by the
+ * max concurrent checkpoints and minimum-pause values */
+ private final long baseInterval;
+ /** The max time (in ms) that a checkpoint may take */
private final long checkpointTimeout;
+
+ /** The min time(in ms) to delay after a checkpoint could be triggered. Allows to
+ * enforce minimum processing time between checkpoint attempts */
+ private final long minPauseBetweenCheckpoints;
+
+ /** The maximum number of checkpoints that may be in progress at the same time */
+ private final int maxConcurrentCheckpointAttempts;
- private TimerTask periodicScheduler;
+ /** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
+ private final Timer timer;
+ /** Actor that receives status updates from the execution graph this coordinator works for */
private ActorGateway jobStatusListener;
+
+ /** The number of consecutive failed trigger attempts */
+ private int numUnsuccessfulCheckpointsTriggers;
+
+
+ private ScheduledTrigger currentPeriodicTrigger;
+
+ /** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
+ * Non-volatile, because only accessed in synchronized scope */
+ private boolean periodicScheduling;
- private ClassLoader userClassLoader;
+ /** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
+ * accessed in synchronized scope */
+ private boolean triggerRequestQueued;
+ /** Flag marking the coordinator as shut down (not accepting any messages any more) */
private volatile boolean shutdown;
/** Shutdown hook thread to clean up state handles. */
@@ -121,6 +146,7 @@ public class CheckpointCoordinator {
public CheckpointCoordinator(
JobID job,
+ long baseInterval,
long checkpointTimeout,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
@@ -130,11 +156,36 @@ public class CheckpointCoordinator {
CompletedCheckpointStore completedCheckpointStore,
RecoveryMode recoveryMode) throws Exception {
+ this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE,
+ tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+ userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode);
+ }
+
+ public CheckpointCoordinator(
+ JobID job,
+ long baseInterval,
+ long checkpointTimeout,
+ long minPauseBetweenCheckpoints,
+ int maxConcurrentCheckpointAttempts,
+ ExecutionVertex[] tasksToTrigger,
+ ExecutionVertex[] tasksToWaitFor,
+ ExecutionVertex[] tasksToCommitTo,
+ ClassLoader userClassLoader,
+ CheckpointIDCounter checkpointIDCounter,
+ CompletedCheckpointStore completedCheckpointStore,
+ RecoveryMode recoveryMode) throws Exception {
+
// Sanity check
+ checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
+ checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
+ checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
this.job = checkNotNull(job);
+ this.baseInterval = baseInterval;
this.checkpointTimeout = checkpointTimeout;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
this.tasksToTrigger = checkNotNull(tasksToTrigger);
this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -143,10 +194,11 @@ public class CheckpointCoordinator {
this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
this.userClassLoader = userClassLoader;
this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+
checkpointIDCounter.start();
this.timer = new Timer("Checkpoint Timer", true);
-
+
if (recoveryMode == RecoveryMode.STANDALONE) {
// Add shutdown hook to clean up state handles when no checkpoint recovery is
// possible. In case of another configured recovery mode, the checkpoints need to be
@@ -158,7 +210,7 @@ public class CheckpointCoordinator {
CheckpointCoordinator.this.shutdown();
}
catch (Throwable t) {
- LOG.error("Error during shutdown of checkpoint coordniator via " +
+ LOG.error("Error during shutdown of checkpoint coordinator via " +
"JVM shutdown hook: " + t.getMessage(), t);
}
}
@@ -197,7 +249,10 @@ public class CheckpointCoordinator {
shutdown = true;
LOG.info("Stopping checkpoint coordinator for job " + job);
- // shut down the thread that handles the timeouts
+ periodicScheduling = false;
+ triggerRequestQueued = false;
+
+ // shut down the thread that handles the timeouts and pending triggers
timer.cancel();
// make sure that the actor does not linger
@@ -206,17 +261,11 @@ public class CheckpointCoordinator {
jobStatusListener = null;
}
- // the scheduling thread needs also to go away
- if (periodicScheduler != null) {
- periodicScheduler.cancel();
- periodicScheduler = null;
- }
-
checkpointIdCounter.stop();
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : pendingCheckpoints.values()) {
- pending.discard(userClassLoader, true);
+ pending.discard(userClassLoader);
}
pendingCheckpoints.clear();
@@ -235,7 +284,7 @@ public class CheckpointCoordinator {
// race, JVM is in shutdown already, we can safely ignore this
}
catch (Throwable t) {
- LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t);
+ LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t);
}
}
}
@@ -251,93 +300,136 @@ public class CheckpointCoordinator {
// --------------------------------------------------------------------------------------------
/**
- * Triggers a new checkpoint and uses the current system time as the
- * checkpoint time.
- */
- public void triggerCheckpoint() throws Exception {
- triggerCheckpoint(System.currentTimeMillis());
- }
-
- /**
* Triggers a new checkpoint and uses the given timestamp as the checkpoint
* timestamp.
*
* @param timestamp The timestamp for the checkpoint.
*/
- public boolean triggerCheckpoint(final long timestamp) throws Exception {
- if (shutdown) {
- LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
- return false;
- }
-
- final long checkpointID = checkpointIdCounter.getAndIncrement();
- LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
-
- try {
- // first check if all tasks that we need to trigger are running.
- // if not, abort the checkpoint
- ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
- for (int i = 0; i < tasksToTrigger.length; i++) {
- Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
- if (ee != null && ee.getState() == ExecutionState.RUNNING) {
- triggerIDs[i] = ee.getAttemptId();
- } else {
- LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
- tasksToTrigger[i].getSimpleName());
- return false;
- }
+ public boolean triggerCheckpoint(long timestamp) throws Exception {
+ // make some eager pre-checks
+ synchronized (lock) {
+ // abort if the coordinator has been shutdown in the meantime
+ if (shutdown) {
+ return false;
+ }
+
+ // sanity check: there should never be more than one trigger request queued
+ if (triggerRequestQueued) {
+ LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ return false;
}
- // next, check if all tasks that need to acknowledge the checkpoint are running.
- // if not, abort the checkpoint
- Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
- new HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length);
-
- for (ExecutionVertex ev : tasksToWaitFor) {
- Execution ee = ev.getCurrentExecutionAttempt();
- if (ee != null) {
- ackTasks.put(ee.getAttemptId(), ev);
- } else {
- LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
- ev.getSimpleName());
- return false;
+ // if too many checkpoints are currently in progress, we need to mark that a request is queued
+ if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+ triggerRequestQueued = true;
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger = null;
}
+ return false;
}
-
- // register a new pending checkpoint. this makes sure we can properly receive acknowledgements
- final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+ }
- // schedule the timer that will clean up the expired checkpoints
- TimerTask canceller = new TimerTask() {
- @Override
- public void run() {
- try {
- synchronized (lock) {
- // only do the work if the checkpoint is not discarded anyways
- // note that checkpoint completion discards the pending checkpoint object
- if (!checkpoint.isDiscarded()) {
- LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-
- checkpoint.discard(userClassLoader, true);
-
- pendingCheckpoints.remove(checkpointID);
- rememberRecentCheckpointId(checkpointID);
- }
+ // first check if all tasks that we need to trigger are running.
+ // if not, abort the checkpoint
+ ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
+ for (int i = 0; i < tasksToTrigger.length; i++) {
+ Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
+ if (ee != null && ee.getState() == ExecutionState.RUNNING) {
+ triggerIDs[i] = ee.getAttemptId();
+ } else {
+ LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
+ tasksToTrigger[i].getSimpleName());
+ return false;
+ }
+ }
+
+ // next, check if all tasks that need to acknowledge the checkpoint are running.
+ // if not, abort the checkpoint
+ Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
+
+ for (ExecutionVertex ev : tasksToWaitFor) {
+ Execution ee = ev.getCurrentExecutionAttempt();
+ if (ee != null) {
+ ackTasks.put(ee.getAttemptId(), ev);
+ } else {
+ LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
+ ev.getSimpleName());
+ return false;
+ }
+ }
+
+ // we will actually trigger this checkpoint!
+
+ final long checkpointID;
+ try {
+ // this must happen outside the locked scope, because it communicates
+ // with external services (in HA mode) and may block for a while.
+ checkpointID = checkpointIdCounter.getAndIncrement();
+ }
+ catch (Throwable t) {
+ int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+ LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+ return false;
+ }
+
+ LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+
+ final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+
+ // schedule the timer that will clean up the expired checkpoints
+ TimerTask canceller = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ synchronized (lock) {
+ // only do the work if the checkpoint is not discarded anyways
+ // note that checkpoint completion discards the pending checkpoint object
+ if (!checkpoint.isDiscarded()) {
+ LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+
+ checkpoint.discard(userClassLoader);
+ pendingCheckpoints.remove(checkpointID);
+ rememberRecentCheckpointId(checkpointID);
+
+ triggerQueuedRequests();
}
}
- catch (Throwable t) {
- LOG.error("Exception while handling checkpoint timeout", t);
- }
}
- };
-
+ catch (Throwable t) {
+ LOG.error("Exception while handling checkpoint timeout", t);
+ }
+ }
+ };
+
+ try {
+ // re-acquire the lock
synchronized (lock) {
+ // since we released the lock in the meantime, we need to re-check
+ // that the conditions still hold. this is clumsy, but it allows us to
+ // release the lock in the meantime while calls to external services are
+ // blocking progress, and still gives us early checks that skip work
+ // if no checkpoint can happen anyways
if (shutdown) {
- throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
+ return false;
+ }
+ else if (triggerRequestQueued) {
+ LOG.warn("Trying to trigger another checkpoint while one was queued already");
+ return false;
+ }
+ else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+ triggerRequestQueued = true;
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger = null;
+ }
+ return false;
}
+
pendingCheckpoints.put(checkpointID, checkpoint);
timer.schedule(canceller, checkpointTimeout);
}
+ // end of lock scope
// send the messages to the tasks that trigger their checkpoint
for (int i = 0; i < tasksToTrigger.length; i++) {
@@ -345,21 +437,21 @@ public class CheckpointCoordinator {
TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
}
-
- numUnsuccessfulCheckpointsTriggers.set(0);
+
+ numUnsuccessfulCheckpointsTriggers = 0;
return true;
}
catch (Throwable t) {
- int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
- LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-
+ // guard the map against concurrent modifications
synchronized (lock) {
- PendingCheckpoint checkpoint = pendingCheckpoints.remove(checkpointID);
- if (checkpoint != null && !checkpoint.isDiscarded()) {
- checkpoint.discard(userClassLoader, true);
- }
+ pendingCheckpoints.remove(checkpointID);
}
+ int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+ LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+ if (!checkpoint.isDiscarded()) {
+ checkpoint.discard(userClassLoader);
+ }
return false;
}
}
@@ -401,6 +493,8 @@ public class CheckpointCoordinator {
rememberRecentCheckpointId(checkpointId);
dropSubsumedCheckpoints(completed.getTimestamp());
+
+ triggerQueuedRequests();
}
}
else {
@@ -455,13 +549,38 @@ public class CheckpointCoordinator {
if (p.getCheckpointTimestamp() < timestamp) {
rememberRecentCheckpointId(p.getCheckpointId());
- p.discard(userClassLoader, true);
+ p.discard(userClassLoader);
entries.remove();
}
}
}
+ /**
+ * Triggers the queued request, if there is one.
+ *
+ * <p>NOTE: The caller of this method must hold the lock when invoking the method!
+ */
+ private void triggerQueuedRequests() throws Exception {
+ if (triggerRequestQueued) {
+ triggerRequestQueued = false;
+
+ // trigger the checkpoint from the trigger timer, to finish the work of this thread before
+ // starting with the next checkpoint
+ ScheduledTrigger trigger = new ScheduledTrigger();
+ if (periodicScheduling) {
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ }
+ currentPeriodicTrigger = trigger;
+ timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+ }
+ else {
+ timer.schedule(trigger, 0L);
+ }
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------
@@ -557,63 +676,74 @@ public class CheckpointCoordinator {
// Periodic scheduling of checkpoints
// --------------------------------------------------------------------------------------------
- public void startPeriodicCheckpointScheduler(long interval) {
+ public void startCheckpointScheduler() {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
- // cancel any previous scheduler
- stopPeriodicCheckpointScheduler();
+ // make sure all prior timers are cancelled
+ stopCheckpointScheduler();
- // start a new scheduler
- periodicScheduler = new TimerTask() {
- @Override
- public void run() {
- try {
- triggerCheckpoint();
- }
- catch (Exception e) {
- LOG.error("Exception while triggering checkpoint", e);
- }
- }
- };
- timer.scheduleAtFixedRate(periodicScheduler, interval, interval);
+ periodicScheduling = true;
+ currentPeriodicTrigger = new ScheduledTrigger();
+ timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
}
}
- public void stopPeriodicCheckpointScheduler() {
+ public void stopCheckpointScheduler() {
synchronized (lock) {
- if (periodicScheduler != null) {
- periodicScheduler.cancel();
- periodicScheduler = null;
+ triggerRequestQueued = false;
+ periodicScheduling = false;
+
+ if (currentPeriodicTrigger != null) {
+ currentPeriodicTrigger.cancel();
+ currentPeriodicTrigger = null;
+ }
+
+ for (PendingCheckpoint p : pendingCheckpoints.values()) {
+ p.discard(userClassLoader);
}
+ pendingCheckpoints.clear();
+
+ numUnsuccessfulCheckpointsTriggers = 0;
}
}
- public ActorGateway createJobStatusListener(
- ActorSystem actorSystem,
- long checkpointInterval,
- UUID leaderSessionID) {
+ // ------------------------------------------------------------------------
+ // job status listener that schedules / cancels periodic checkpoints
+ // ------------------------------------------------------------------------
+
+ public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
+
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}
if (jobStatusListener == null) {
- Props props = Props.create(
- CheckpointCoordinatorDeActivator.class,
- this,
- checkpointInterval,
- leaderSessionID);
+ Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
// wrap the ActorRef in a AkkaActorGateway to support message decoration
- jobStatusListener = new AkkaActorGateway(
- actorSystem.actorOf(props),
- leaderSessionID);
+ jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
}
return jobStatusListener;
}
}
+
+ // ------------------------------------------------------------------------
+
+ private class ScheduledTrigger extends TimerTask {
+
+ @Override
+ public void run() {
+ try {
+ triggerCheckpoint(System.currentTimeMillis());
+ }
+ catch (Exception e) {
+ LOG.error("Exception while triggering checkpoint", e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e32b72..8bdab7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -32,19 +32,15 @@ import java.util.UUID;
public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
private final CheckpointCoordinator coordinator;
- private final long interval;
private final UUID leaderSessionID;
public CheckpointCoordinatorDeActivator(
CheckpointCoordinator coordinator,
- long interval,
UUID leaderSessionID) {
LOG.info("Create CheckpointCoordinatorDeActivator");
this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
-
- this.interval = interval;
this.leaderSessionID = leaderSessionID;
}
@@ -55,11 +51,10 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
if (status == JobStatus.RUNNING) {
// start the checkpoint scheduler
- coordinator.startPeriodicCheckpointScheduler(interval);
- }
- else {
+ coordinator.startCheckpointScheduler();
+ } else {
// anything else should stop the trigger for now
- coordinator.stopPeriodicCheckpointScheduler();
+ coordinator.stopCheckpointScheduler();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 19c65d4..b94e5bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -117,7 +117,7 @@ public class PendingCheckpoint {
if (notYetAcknowledgedTasks.isEmpty()) {
CompletedCheckpoint completed = new CompletedCheckpoint(jobId, checkpointId,
checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
- discard(null, false);
+ dispose(null, false);
return completed;
}
@@ -150,11 +150,15 @@ public class PendingCheckpoint {
/**
* Discards the pending checkpoint, releasing all held resources.
*/
- public void discard(ClassLoader userClassLoader, boolean discardStateHandle) {
+ public void discard(ClassLoader userClassLoader) {
+ dispose(userClassLoader, true);
+ }
+
+ private void dispose(ClassLoader userClassLoader, boolean releaseState) {
synchronized (lock) {
discarded = true;
numAcknowledgedTasks = -1;
- if (discardStateHandle) {
+ if (releaseState) {
for (StateForTask state : collectedStates) {
state.discard(userClassLoader);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d10aac1..9218fe4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -182,9 +182,6 @@ public class ExecutionGraph implements Serializable {
* from results than need to be materialized. */
private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
- /** Flag that indicate whether the executed dataflow should be periodically snapshotted */
- private boolean snapshotCheckpointsEnabled;
-
/** Flag to indicate whether the Graph has been archived */
private boolean isArchived = false;
@@ -341,9 +338,12 @@ public class ExecutionGraph implements Serializable {
public boolean isArchived() {
return isArchived;
}
+
public void enableSnapshotCheckpointing(
long interval,
long checkpointTimeout,
+ long minPauseBetweenCheckpoints,
+ int maxConcurrentCheckpoints,
List<ExecutionJobVertex> verticesToTrigger,
List<ExecutionJobVertex> verticesToWaitFor,
List<ExecutionJobVertex> verticesToCommitTo,
@@ -368,11 +368,13 @@ public class ExecutionGraph implements Serializable {
// disable to make sure existing checkpoint coordinators are cleared
disableSnaphotCheckpointing();
- // create the coordinator that triggers and commits checkpoints and holds the state
- snapshotCheckpointsEnabled = true;
+ // create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator = new CheckpointCoordinator(
jobID,
+ interval,
checkpointTimeout,
+ minPauseBetweenCheckpoints,
+ maxConcurrentCheckpoints,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
@@ -384,10 +386,7 @@ public class ExecutionGraph implements Serializable {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(
- checkpointCoordinator.createJobStatusListener(
- actorSystem,
- interval,
- leaderSessionID));
+ checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
}
/**
@@ -401,16 +400,11 @@ public class ExecutionGraph implements Serializable {
throw new IllegalStateException("Job must be in CREATED state");
}
- snapshotCheckpointsEnabled = false;
if (checkpointCoordinator != null) {
checkpointCoordinator.shutdown();
checkpointCoordinator = null;
}
}
-
- public boolean isSnapshotCheckpointsEnabled() {
- return snapshotCheckpointsEnabled;
- }
public CheckpointCoordinator getCheckpointCoordinator() {
return checkpointCoordinator;
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 86c9b60..d58be52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -22,17 +22,17 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import java.util.List;
+import static java.util.Objects.requireNonNull;
+
/**
- * The JobSnapshottingSettings are attached to a JobGraph and describe the settings
- * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices
+ * The JobCheckpointingSettings are attached to a JobGraph and describe the settings
+ * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
* need to participate.
*/
public class JobSnapshottingSettings implements java.io.Serializable{
private static final long serialVersionUID = -2593319571078198180L;
- /** The default time in which pending checkpoints need to be acknowledged before timing out */
- public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
private final List<JobVertexID> verticesToTrigger;
@@ -43,26 +43,32 @@ public class JobSnapshottingSettings implements java.io.Serializable{
private final long checkpointInterval;
private final long checkpointTimeout;
-
-
- public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
- List<JobVertexID> verticesToAcknowledge,
- List<JobVertexID> verticesToConfirm,
- long checkpointInterval)
- {
- this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
- }
+
+ private final long minPauseBetweenCheckpoints;
+
+ private final int maxConcurrentCheckpoints;
+
public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
List<JobVertexID> verticesToAcknowledge,
List<JobVertexID> verticesToConfirm,
- long checkpointInterval, long checkpointTimeout)
+ long checkpointInterval, long checkpointTimeout,
+ long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
{
- this.verticesToTrigger = verticesToTrigger;
- this.verticesToAcknowledge = verticesToAcknowledge;
- this.verticesToConfirm = verticesToConfirm;
+ // sanity checks
+ if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+ minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1)
+ {
+ throw new IllegalArgumentException();
+ }
+
+ this.verticesToTrigger = requireNonNull(verticesToTrigger);
+ this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
+ this.verticesToConfirm = requireNonNull(verticesToConfirm);
this.checkpointInterval = checkpointInterval;
this.checkpointTimeout = checkpointTimeout;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
}
// --------------------------------------------------------------------------------------------
@@ -87,11 +93,22 @@ public class JobSnapshottingSettings implements java.io.Serializable{
return checkpointTimeout;
}
+ public long getMinPauseBetweenCheckpoints() {
+ return minPauseBetweenCheckpoints;
+ }
+
+ public int getMaxConcurrentCheckpoints() {
+ return maxConcurrentCheckpoints;
+ }
+
// --------------------------------------------------------------------------------------------
@Override
public String toString() {
- return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s",
- checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+ return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
+ "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
+ checkpointInterval, checkpointTimeout,
+ minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
+ verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c4d0fbb..8cbb13a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -913,6 +913,8 @@ class JobManager(
executionGraph.enableSnapshotCheckpointing(
snapshotSettings.getCheckpointInterval,
snapshotSettings.getCheckpointTimeout,
+ snapshotSettings.getMinPauseBetweenCheckpoints,
+ snapshotSettings.getMaxConcurrentCheckpoints,
triggerVertices,
ackVertices,
confirmVertices,
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f6ee5c5..cd52fd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -28,10 +28,17 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -39,6 +46,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -70,7 +78,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {}, cl,
@@ -116,7 +124,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {}, cl,
@@ -160,7 +168,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] {}, cl, new StandaloneCheckpointIDCounter(), new
@@ -199,7 +207,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 },
new ExecutionVertex[] { vertex1, vertex2 }, cl,
@@ -343,7 +351,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex }, cl,
@@ -472,7 +480,7 @@ public class CheckpointCoordinatorTest {
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 600000,
+ jid, 600000, 600000,
new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
new ExecutionVertex[] { commitVertex }, cl,
@@ -587,7 +595,7 @@ public class CheckpointCoordinatorTest {
// the timeout for the checkpoint is a 200 milliseconds
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 200,
+ jid, 600000, 200,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex }, cl,
@@ -649,7 +657,7 @@ public class CheckpointCoordinatorTest {
ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
CheckpointCoordinator coord = new CheckpointCoordinator(
- jid, 200000,
+ jid, 200000, 200000,
new ExecutionVertex[] { triggerVertex },
new ExecutionVertex[] { ackVertex1, ackVertex2 },
new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -680,14 +688,343 @@ public class CheckpointCoordinatorTest {
}
}
+ @Test
+ public void testPeriodicTriggering() {
+ try {
+ final JobID jid = new JobID();
+ final long start = System.currentTimeMillis();
+
+ // create some mock execution vertices and trigger some checkpoint
+
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+ ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ final AtomicInteger numCalls = new AtomicInteger();
+
+ doAnswer(new Answer<Void>() {
+
+ private long lastId = -1;
+ private long lastTs = -1;
+
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ TriggerCheckpoint message = (TriggerCheckpoint) invocation.getArguments()[0];
+ long id = message.getCheckpointId();
+ long ts = message.getTimestamp();
+
+ assertTrue(id > lastId);
+ assertTrue(ts >= lastTs);
+ assertTrue(ts >= start);
+
+ lastId = id;
+ lastTs = ts;
+ numCalls.incrementAndGet();
+ return null;
+ }
+ }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 10, // periodic interval is 10 ms
+ 200000, // timeout is very long (200 s)
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex },
+ new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+ (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+ coord.startCheckpointScheduler();
+
+ long timeout = System.currentTimeMillis() + 60000;
+ do {
+ Thread.sleep(20);
+ }
+ while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
+ assertTrue(numCalls.get() >= 5);
+
+ coord.stopCheckpointScheduler();
+
+
+ // for 400 ms, no further calls may come.
+ // there may be the case that one trigger was fired and about to
+ // acquire the lock, such that after cancelling it will still do
+ // the remainder of its work
+ int numCallsSoFar = numCalls.get();
+ Thread.sleep(400);
+ assertTrue(numCallsSoFar == numCalls.get() ||
+ numCallsSoFar+1 == numCalls.get());
+
+ // start another sequence of periodic scheduling
+ numCalls.set(0);
+ coord.startCheckpointScheduler();
+
+ timeout = System.currentTimeMillis() + 60000;
+ do {
+ Thread.sleep(20);
+ }
+ while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
+ assertTrue(numCalls.get() >= 5);
+
+ coord.stopCheckpointScheduler();
+
+ // for 400 ms, no further calls may come
+ // there may be the case that one trigger was fired and about to
+ // acquire the lock, such that after cancelling it will still do
+ // the remainder of its work
+ numCallsSoFar = numCalls.get();
+ Thread.sleep(400);
+ assertTrue(numCallsSoFar == numCalls.get() ||
+ numCallsSoFar + 1 == numCalls.get());
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMaxConcurrentAttempts1() {
+ testMaxConcurrentAttemps(1);
+ }
+
+ @Test
+ public void testMaxConcurrentAttempts2() {
+ testMaxConcurrentAttemps(2);
+ }
+
+ @Test
+ public void testMaxConcurrentAttempts5() {
+ testMaxConcurrentAttemps(5);
+ }
+
+ private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+ try {
+ final JobID jid = new JobID();
+
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+ ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ final AtomicInteger numCalls = new AtomicInteger();
+
+ doAnswer(new Answer<Void>() {
+ @Override
+ public Void answer(InvocationOnMock invocation) throws Throwable {
+ numCalls.incrementAndGet();
+ return null;
+ }
+ }).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 10, // periodic interval is 10 ms
+ 200000, // timeout is very long (200 s)
+ 0L, // no extra delay
+ maxConcurrentAttempts,
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex },
+ new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+ (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+ coord.startCheckpointScheduler();
+
+ // after a while, there should be exactly as many checkpoints
+ // as concurrently permitted
+ long now = System.currentTimeMillis();
+ long timeout = now + 60000;
+ long minDuration = now + 100;
+ do {
+ Thread.sleep(20);
+ }
+ while ((now = System.currentTimeMillis()) < minDuration ||
+ (numCalls.get() < maxConcurrentAttempts && now < timeout));
+
+ assertEquals(maxConcurrentAttempts, numCalls.get());
+
+ verify(triggerVertex, times(maxConcurrentAttempts))
+ .sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID));
+
+ // now, once we acknowledge one checkpoint, it should trigger the next one
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+
+ // this should have immediately triggered a new checkpoint
+ now = System.currentTimeMillis();
+ timeout = now + 60000;
+ do {
+ Thread.sleep(20);
+ }
+ while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout);
+
+ assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+
+ // no further checkpoints should happen
+ Thread.sleep(200);
+ assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testMaxConcurrentAttempsWithSubsumption() {
+ try {
+ final int maxConcurrentAttempts = 2;
+ final JobID jid = new JobID();
+
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+ ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 10, // periodic interval is 10 ms
+ 200000, // timeout is very long (200 s)
+ 0L, // no extra delay
+ maxConcurrentAttempts, // max two concurrent checkpoints
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex },
+ new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+ (), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+ coord.startCheckpointScheduler();
+
+ // after a while, there should be exactly as many checkpoints
+ // as concurrently permitted
+ long now = System.currentTimeMillis();
+ long timeout = now + 60000;
+ long minDuration = now + 100;
+ do {
+ Thread.sleep(20);
+ }
+ while ((now = System.currentTimeMillis()) < minDuration ||
+ (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+
+ // validate that the pending checkpoints are there
+ assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
+ assertNotNull(coord.getPendingCheckpoints().get(1L));
+ assertNotNull(coord.getPendingCheckpoints().get(2L));
+
+ // now we acknowledge the second checkpoint, which should subsume the first checkpoint
+ // and allow two more checkpoints to be triggered
+ // now, once we acknowledge one checkpoint, it should trigger the next one
+ coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+
+ // after a while, there should be the new checkpoints
+ final long newTimeout = System.currentTimeMillis() + 60000;
+ do {
+ Thread.sleep(20);
+ }
+ while (coord.getPendingCheckpoints().get(4L) == null &&
+ System.currentTimeMillis() < newTimeout);
+
+ // do the final check
+ assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
+ assertNotNull(coord.getPendingCheckpoints().get(3L));
+ assertNotNull(coord.getPendingCheckpoints().get(4L));
+
+ coord.shutdown();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPeriodicSchedulingWithInactiveTasks() {
+ try {
+ final JobID jid = new JobID();
+
+ // create some mock execution vertices and trigger some checkpoint
+ final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+ final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+ ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+ ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+ ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+ final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
+ when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
+ new Answer<ExecutionState>() {
+ @Override
+ public ExecutionState answer(InvocationOnMock invocation){
+ return currentState.get();
+ }
+ });
+
+ CheckpointCoordinator coord = new CheckpointCoordinator(
+ jid,
+ 10, // periodic interval is 10 ms
+ 200000, // timeout is very long (200 s)
+ 0L, // no extra delay
+ 2, // max two concurrent checkpoints
+ new ExecutionVertex[] { triggerVertex },
+ new ExecutionVertex[] { ackVertex },
+ new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+ coord.startCheckpointScheduler();
+
+ // no checkpoint should have started so far
+ Thread.sleep(200);
+ assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+ // now move the state to RUNNING
+ currentState.set(ExecutionState.RUNNING);
+
+ // the coordinator should start checkpointing now
+ final long timeout = System.currentTimeMillis() + 10000;
+ do {
+ Thread.sleep(20);
+ }
+ while (System.currentTimeMillis() < timeout &&
+ coord.getNumberOfPendingCheckpoints() == 0);
+
+ assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
}
- private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
+ private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID,
+ ExecutionState state, ExecutionState ... successiveStates) {
final Execution exec = mock(Execution.class);
when(exec.getAttemptId()).thenReturn(attemptID);
- when(exec.getState()).thenReturn(state);
+ when(exec.getState()).thenReturn(state, successiveStates);
ExecutionVertex vertex = mock(ExecutionVertex.class);
when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7b2c2d4..bec04bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -80,7 +80,7 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
- CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0], cl,
@@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest {
map.put(statelessId, stateless);
- CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L,
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
new ExecutionVertex[0], cl,
@@ -193,7 +193,7 @@ public class CheckpointStateRestoreTest {
@Test
public void testNoCheckpointAvailable() {
try {
- CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L,
+ CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L, 200000L,
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[] { mock(ExecutionVertex.class) },
new ExecutionVertex[0], cl,
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index f6e4ab8..1c666e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -61,7 +61,8 @@ public class CoordinatorShutdownTest {
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
- testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+ testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+ 5000, 60000, 0L, Integer.MAX_VALUE));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -112,7 +113,8 @@ public class CoordinatorShutdownTest {
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
- testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+ testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+ 5000, 60000, 0L, Integer.MAX_VALUE));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
new file mode 100644
index 0000000..0320d6b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Configuration that captures all checkpointing related settings.
+ */
+public class CheckpointConfig implements java.io.Serializable {
+
+ private static final long serialVersionUID = -750378776078908147L;
+
+ /** The default checkpoint mode: exactly once */
+ public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+
+ /** The default timeout of a checkpoint attempt: 10 minutes */
+ public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+
+ /** The default minimum pause to be made between checkpoints: none */
+ public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+
+ /** The default limit of concurrently happening checkpoints: one */
+ public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+
+ // ------------------------------------------------------------------------
+
+ /** Checkpointing mode (exactly-once vs. at-least-once). */
+ private CheckpointingMode checkpointingMode = DEFAULT_MODE;
+
+ /** Periodic checkpoint triggering interval */
+ private long checkpointInterval = -1; // disabled
+
+ /** Maximum time checkpoint may take before being discarded */
+ private long checkpointTimeout = DEFAULT_TIMEOUT;
+
+ /** Minimal pause between checkpointing attempts */
+ private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
+
+ /** Maximum number of checkpoint attempts in progress at the same time */
+ private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
+
+ /** Flag to force checkpointing in iterative jobs */
+ private boolean forceCheckpointing;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Checks whether checkpointing is enabled.
+ *
+ * @return True if checkpointing is enables, false otherwise.
+ */
+ public boolean isCheckpointingEnabled() {
+ return checkpointInterval > 0;
+ }
+
+ /**
+ * Gets the checkpointing mode (exactly-once vs. at-least-once).
+ *
+ * @return The checkpointing mode.
+ */
+ public CheckpointingMode getCheckpointingMode() {
+ return checkpointingMode;
+ }
+
+ /**
+ * Sets the checkpointing mode (exactly-once vs. at-least-once).
+ *
+ * @param checkpointingMode The checkpointing mode.
+ */
+ public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
+ this.checkpointingMode = requireNonNull(checkpointingMode);
+ }
+
+ /**
+ * Gets the interval in which checkpoints are periodically scheduled.
+ *
+ * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings
+ * {@link #getMaxConcurrentCheckpoints()} and {@link #getMinPauseBetweenCheckpoints()}.
+ *
+ * @return The checkpoint interval, in milliseconds.
+ */
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ /**
+ * Sets the interval in which checkpoints are periodically scheduled.
+ *
+ * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings
+ * {@link #setMaxConcurrentCheckpoints(int)} and {@link #setMinPauseBetweenCheckpoints(long)}.
+ *
+ * @param checkpointInterval The checkpoint interval, in milliseconds.
+ */
+ public void setCheckpointInterval(long checkpointInterval) {
+ if (checkpointInterval <= 0) {
+ throw new IllegalArgumentException("Checkpoint interval must be larger than zero");
+ }
+ this.checkpointInterval = checkpointInterval;
+ }
+
+ /**
+ * Gets the maximum time that a checkpoint may take before being discarded.
+ *
+ * @return The checkpoint timeout, in milliseconds.
+ */
+ public long getCheckpointTimeout() {
+ return checkpointTimeout;
+ }
+
+ /**
+ * Sets the maximum time that a checkpoint may take before being discarded.
+ *
+ * @param checkpointTimeout The checkpoint timeout, in milliseconds.
+ */
+ public void setCheckpointTimeout(long checkpointTimeout) {
+ if (checkpointInterval <= 0) {
+ throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
+ }
+ this.checkpointTimeout = checkpointTimeout;
+ }
+
+ /**
+ * Gets the minimal pause between checkpointing attempts. This setting defines how soon the
+ * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+ * another checkpoint with respect to the maximum number of concurrent checkpoints
+ * (see {@link #getMaxConcurrentCheckpoints()}).
+ *
+ * @return The minimal pause before the next checkpoint is triggered.
+ */
+ public long getMinPauseBetweenCheckpoints() {
+ return minPauseBetweenCheckpoints;
+ }
+
+// /**
+// * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
+// * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+// * another checkpoint with respect to the maximum number of concurrent checkpoints
+// * (see {@link #setMaxConcurrentCheckpoints(int)}).
+// *
+// * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
+// * that a minimum amount of time passes where no checkpoint is in progress at all.
+// *
+// * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
+// */
+// public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
+// if (minPauseBetweenCheckpoints < 0) {
+// throw new IllegalArgumentException("Pause value must be zero or positive");
+// }
+// this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+// }
+
+ /**
+ * Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this
+ * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are
+ * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need
+ * to finish or expire.
+ *
+ * @return The maximum number of concurrent checkpoint attempts.
+ */
+ public int getMaxConcurrentCheckpoints() {
+ return maxConcurrentCheckpoints;
+ }
+
+ /**
+ * Sets the maximum number of checkpoint attempts that may be in progress at the same time. If this
+ * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are
+ * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need
+ * to finish or expire.
+ *
+ * @param maxConcurrentCheckpoints The maximum number of concurrent checkpoint attempts.
+ */
+ public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+ if (maxConcurrentCheckpoints < 1) {
+ throw new IllegalArgumentException("The maximum number of concurrent attempts must be at least one.");
+ }
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ }
+
+ /**
+ * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
+ *
+ * @return True, if checkpointing is forced, false otherwise.
+ *
+ * @deprecated This will be removed once iterations properly participate in checkpointing.
+ */
+ @Deprecated
+ public boolean isForceCheckpointing() {
+ return forceCheckpointing;
+ }
+
+ /**
+ * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
+ *
+ * @param forceCheckpointing The flag to force checkpointing.
+ *
+ * @deprecated This will be removed once iterations properly participate in checkpointing.
+ */
+ @Deprecated
+ public void setForceCheckpointing(boolean forceCheckpointing) {
+ this.forceCheckpointing = forceCheckpointing;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 72722bf..cb5fce5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -114,17 +114,14 @@ public abstract class StreamExecutionEnvironment {
/** The execution configuration for this environment */
private final ExecutionConfig config = new ExecutionConfig();
+ /** Settings that control the checkpointing behavior */
+ private final CheckpointConfig checkpointCfg = new CheckpointConfig();
+
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
protected boolean isChainingEnabled = true;
-
- protected long checkpointInterval = -1; // disabled
-
- protected CheckpointingMode checkpointingMode;
-
- protected boolean forceCheckpointing = false;
/** The state backend used for storing k/v state and state snapshots */
private StateBackend<?> defaultStateBackend;
@@ -239,7 +236,17 @@ public abstract class StreamExecutionEnvironment {
// ------------------------------------------------------------------------
// Checkpointing Settings
// ------------------------------------------------------------------------
-
+
+ /**
+ * Gets the checkpoint config, which defines values like checkpoint interval, delay between
+ * checkpoints, etc.
+ *
+ * @return The checkpoint config.
+ */
+ public CheckpointConfig getCheckpointConfig() {
+ return checkpointCfg;
+ }
+
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming
@@ -257,7 +264,8 @@ public abstract class StreamExecutionEnvironment {
* @param interval Time interval between state checkpoints in milliseconds.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval) {
- return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
+ checkpointCfg.setCheckpointInterval(interval);
+ return this;
}
/**
@@ -280,15 +288,8 @@ public abstract class StreamExecutionEnvironment {
* The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
*/
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
- if (mode == null) {
- throw new NullPointerException("checkpoint mode must not be null");
- }
- if (interval <= 0) {
- throw new IllegalArgumentException("the checkpoint interval must be positive");
- }
-
- this.checkpointInterval = interval;
- this.checkpointingMode = mode;
+ checkpointCfg.setCheckpointingMode(mode);
+ checkpointCfg.setCheckpointInterval(interval);
return this;
}
@@ -312,10 +313,11 @@ public abstract class StreamExecutionEnvironment {
* If true checkpointing will be enabled for iterative jobs as well.
*/
@Deprecated
+ @SuppressWarnings("deprecation")
public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
- this.enableCheckpointing(interval, mode);
-
- this.forceCheckpointing = force;
+ checkpointCfg.setCheckpointingMode(mode);
+ checkpointCfg.setCheckpointInterval(interval);
+ checkpointCfg.setForceCheckpointing(force);
return this;
}
@@ -337,32 +339,39 @@ public abstract class StreamExecutionEnvironment {
*/
@Deprecated
public StreamExecutionEnvironment enableCheckpointing() {
- enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+ checkpointCfg.setCheckpointInterval(500);
return this;
}
/**
* Returns the checkpointing interval or -1 if checkpointing is disabled.
+ *
+ * <p>Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}.
*
* @return The checkpointing interval or -1
*/
public long getCheckpointInterval() {
- return checkpointInterval;
+ return checkpointCfg.getCheckpointInterval();
}
-
/**
* Returns whether checkpointing is force-enabled.
*/
+ @Deprecated
+ @SuppressWarnings("deprecation")
public boolean isForceCheckpointing() {
- return forceCheckpointing;
+ return checkpointCfg.isForceCheckpointing();
}
/**
- * Returns the {@link CheckpointingMode}.
+ * Returns the checkpointing mode (exactly-once vs. at-least-once).
+ *
+ * <p>Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}.
+ *
+ * @return The checkpoin
*/
public CheckpointingMode getCheckpointingMode() {
- return checkpointingMode;
+ return checkpointCfg.getCheckpointingMode();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7d8f9f9..bc3acb7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -23,15 +23,12 @@ import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,8 +40,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -59,7 +56,6 @@ import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-import org.apache.sling.commons.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,9 +66,6 @@ import org.slf4j.LoggerFactory;
*
*/
public class StreamGraph extends StreamingPlan {
-
- /** The default interval for checkpoints, in milliseconds */
- public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
@@ -80,11 +73,9 @@ public class StreamGraph extends StreamingPlan {
private final StreamExecutionEnvironment environemnt;
private final ExecutionConfig executionConfig;
-
- private CheckpointingMode checkpointingMode;
- private boolean checkpointingEnabled = false;
- private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
- private boolean chaining = true;
+ private final CheckpointConfig checkpointConfig;
+
+ private boolean chaining;
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
@@ -97,12 +88,11 @@ public class StreamGraph extends StreamingPlan {
private StateBackend<?> stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
- private boolean forceCheckpoint = false;
public StreamGraph(StreamExecutionEnvironment environment) {
-
this.environemnt = environment;
- executionConfig = environment.getConfig();
+ this.executionConfig = environment.getConfig();
+ this.checkpointConfig = environment.getCheckpointConfig();
// create an empty new stream graph.
clear();
@@ -112,19 +102,23 @@ public class StreamGraph extends StreamingPlan {
* Remove all registered nodes etc.
*/
public void clear() {
- streamNodes = Maps.newHashMap();
- virtualSelectNodes = Maps.newHashMap();
- virtuaPartitionNodes = Maps.newHashMap();
- vertexIDtoBrokerID = Maps.newHashMap();
- vertexIDtoLoopTimeout = Maps.newHashMap();
- iterationSourceSinkPairs = Sets.newHashSet();
- sources = Sets.newHashSet();
- sinks = Sets.newHashSet();
+ streamNodes = new HashMap<>();
+ virtualSelectNodes = new HashMap<>();
+ virtuaPartitionNodes = new HashMap<>();
+ vertexIDtoBrokerID = new HashMap<>();
+ vertexIDtoLoopTimeout = new HashMap<>();
+ iterationSourceSinkPairs = new HashSet<>();
+ sources = new HashSet<>();
+ sinks = new HashSet<>();
}
- protected ExecutionConfig getExecutionConfig() {
+ public ExecutionConfig getExecutionConfig() {
return executionConfig;
}
+
+ public CheckpointConfig getCheckpointConfig() {
+ return checkpointConfig;
+ }
public String getJobName() {
return jobName;
@@ -138,18 +132,6 @@ public class StreamGraph extends StreamingPlan {
this.chaining = chaining;
}
- public void setCheckpointingEnabled(boolean checkpointingEnabled) {
- this.checkpointingEnabled = checkpointingEnabled;
- }
-
- public void setCheckpointingInterval(long checkpointingInterval) {
- this.checkpointingInterval = checkpointingInterval;
- }
-
- public void forceCheckpoint() {
- this.forceCheckpoint = true;
- }
-
public void setStateBackend(StateBackend<?> backend) {
this.stateBackend = backend;
}
@@ -158,27 +140,11 @@ public class StreamGraph extends StreamingPlan {
return this.stateBackend;
}
- public long getCheckpointingInterval() {
- return checkpointingInterval;
- }
-
// Checkpointing
public boolean isChainingEnabled() {
return chaining;
}
-
- public boolean isCheckpointingEnabled() {
- return checkpointingEnabled;
- }
-
- public CheckpointingMode getCheckpointingMode() {
- return checkpointingMode;
- }
-
- public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
- this.checkpointingMode = checkpointingMode;
- }
public boolean isIterative() {
@@ -322,7 +288,7 @@ public class StreamGraph extends StreamingPlan {
downStreamVertexID,
typeNumber,
null,
- Lists.<String>newArrayList());
+ new ArrayList<String>());
}
@@ -463,10 +429,7 @@ public class StreamGraph extends StreamingPlan {
}
public StreamEdge getStreamEdge(int sourceId, int targetId) {
- Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
- while (outIterator.hasNext()) {
- StreamEdge edge = outIterator.next();
-
+ for (StreamEdge edge : getStreamNode(sourceId).getOutEdges()) {
if (edge.getTargetId() == targetId) {
return edge;
}
@@ -505,8 +468,7 @@ public class StreamGraph extends StreamingPlan {
return vertexIDtoLoopTimeout.get(vertexID);
}
- public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
-
+ public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
StreamNode source = this.addNode(sourceId,
StreamIterationHead.class,
null,
@@ -537,15 +499,12 @@ public class StreamGraph extends StreamingPlan {
return iterationSourceSinkPairs;
}
- protected void removeEdge(StreamEdge edge) {
-
+ private void removeEdge(StreamEdge edge) {
edge.getSourceVertex().getOutEdges().remove(edge);
edge.getTargetVertex().getInEdges().remove(edge);
-
}
- protected void removeVertex(StreamNode toRemove) {
-
+ private void removeVertex(StreamNode toRemove) {
Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
edgesToRemove.addAll(toRemove.getInEdges());
@@ -560,9 +519,10 @@ public class StreamGraph extends StreamingPlan {
/**
* Gets the assembled {@link JobGraph}.
*/
+ @SuppressWarnings("deprecation")
public JobGraph getJobGraph() {
// temporarily forbid checkpointing for iterative jobs
- if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
+ if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
throw new UnsupportedOperationException(
"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
@@ -576,16 +536,12 @@ public class StreamGraph extends StreamingPlan {
@Override
public String getStreamingPlanAsJSON() {
-
try {
return new JSONGenerator(this).getJSON();
- } catch (JSONException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("JSON plan creation failed: {}", e);
- }
- return "";
}
-
+ catch (Exception e) {
+ throw new RuntimeException("JSON plan creation failed", e);
+ }
}
@Override
@@ -606,5 +562,4 @@ public class StreamGraph extends StreamingPlan {
public static enum ResourceStrategy {
DEFAULT, ISOLATE, NEWGROUP
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 8bd0e48..4bd7a73 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,17 +98,6 @@ public class StreamGraphGenerator {
private StreamGraphGenerator(StreamExecutionEnvironment env) {
this.streamGraph = new StreamGraph(env);
this.streamGraph.setChaining(env.isChainingEnabled());
-
- if (env.getCheckpointInterval() > 0) {
- this.streamGraph.setCheckpointingEnabled(true);
- this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
- this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
- }
- this.streamGraph.setStateBackend(env.getStateBackend());
- if (env.isForceCheckpointing()) {
- this.streamGraph.forceCheckpoint();
- }
-
this.env = env;
this.alreadyTransformed = new HashMap<>();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 613d381..515e362 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.graph;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -27,10 +26,10 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -45,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.InstantiationUtil;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -274,18 +275,20 @@ public class StreamingJobGraphGenerator {
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
- config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
- if (streamGraph.isCheckpointingEnabled()) {
- config.setCheckpointMode(streamGraph.getCheckpointingMode());
+ final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
+
+ config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
+ if (ceckpointCfg.isCheckpointingEnabled()) {
+ config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
config.setStateBackend(streamGraph.getStateBackend());
- } else {
- // the at least once input handler is slightly cheaper (in the absence of checkpoints),
+ }
+ else {
+ // the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints),
// so we use that one if checkpointing is not enabled
config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
}
- config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
+ config.setStatePartitioner(vertex.getStatePartitioner());
config.setStateKeySerializer(vertex.getStateKeySerializer());
-
Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
@@ -385,8 +388,10 @@ public class StreamingJobGraphGenerator {
}
private void configureCheckpointing() {
- if (streamGraph.isCheckpointingEnabled()) {
- long interval = streamGraph.getCheckpointingInterval();
+ CheckpointConfig cfg = streamGraph.getCheckpointConfig();
+
+ if (cfg.isCheckpointingEnabled()) {
+ long interval = cfg.getCheckpointInterval();
if (interval < 1) {
throw new IllegalArgumentException("The checkpoint interval must be positive");
}
@@ -400,10 +405,9 @@ public class StreamingJobGraphGenerator {
List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
// collect the vertices that receive "commit checkpoint" messages
- // currently, these are all certices
+ // currently, these are all vertices
List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
-
for (JobVertex vertex : jobVertices.values()) {
if (vertex.isInputVertex()) {
triggerVertices.add(vertex.getID());
@@ -414,7 +418,9 @@ public class StreamingJobGraphGenerator {
}
JobSnapshottingSettings settings = new JobSnapshottingSettings(
- triggerVertices, ackVertices, commitVertices, interval);
+ triggerVertices, ackVertices, commitVertices, interval,
+ cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
+ cfg.getMaxConcurrentCheckpoints());
jobGraph.setSnapshotSettings(settings);
// if the user enabled checkpointing, the default number of exec retries is infinitive.
http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 14f23e1..69147f6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -96,6 +96,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
// ------------------------------------------------------------------------
// Checkpointing Settings
// ------------------------------------------------------------------------
+
+ /**
+ * Gets the checkpoint config, which defines values like checkpoint interval, delay between
+ * checkpoints, etc.
+ */
+ def getCheckpointConfig = javaEnv.getCheckpointConfig()
+
/**
* Enables checkpointing for the streaming job. The distributed state of the streaming
* dataflow will be periodically snapshotted. In case of a failure, the streaming