You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/14 15:17:06 UTC
flink git commit: [FLINK-6964] [checkpoint] Fix externalized
incremental checkpoints for StandaloneCompletedCheckpointStore
Repository: flink
Updated Branches:
refs/heads/master 94d3166b4 -> 8cff17fcc
[FLINK-6964] [checkpoint] Fix externalized incremental checkpoints for StandaloneCompletedCheckpointStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cff17fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cff17fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cff17fc
Branch: refs/heads/master
Commit: 8cff17fcc9b4bca6499c26fc2a6318c759cbf251
Parents: 94d3166
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 21 11:22:28 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 14 15:52:09 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 12 +-
.../checkpoint/CheckpointCoordinator.java | 34 ++-
.../savepoint/SavepointV2Serializer.java | 7 +-
.../state/IncrementalKeyedStateHandle.java | 21 +-
.../runtime/state/SharedStateRegistry.java | 45 +++-
.../ZooKeeperCompletedCheckpointStoreTest.java | 19 +-
.../savepoint/CheckpointTestUtils.java | 2 +-
.../state/IncrementalKeyedStateHandleTest.java | 3 +-
.../runtime/testingUtils/TestingCluster.scala | 49 +++-
.../testingUtils/TestingJobManagerLike.scala | 59 ++++-
.../TestingJobManagerMessages.scala | 39 +++-
.../ExternalizedCheckpointITCase.java | 228 +++++++++++++++++++
12 files changed, 482 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 291973c..9d289b4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -185,6 +185,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** The identifier of the last completed checkpoint. */
private long lastCompletedCheckpointId = -1;
+ /** Unique ID of this backend. */
+ private UUID backendUID;
+
private static final String SST_FILE_SUFFIX = ".sst";
public RocksDBKeyedStateBackend(
@@ -233,6 +236,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.kvStateInformation = new HashMap<>();
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
+ this.backendUID = UUID.randomUUID();
+ LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
}
/**
@@ -926,7 +931,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
return new IncrementalKeyedStateHandle(
- stateBackend.operatorIdentifier,
+ stateBackend.backendUID,
stateBackend.keyGroupRange,
checkpointId,
sstFiles,
@@ -1438,6 +1443,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
} else {
+ // pick up again the old backend id, so the we can reference existing state
+ stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
+
+ LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
+ stateBackend.operatorIdentifier, stateBackend.backendUID);
// create hard links in the instance directory
if (!stateBackend.instanceRocksDBPath.mkdirs()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 905d132..3e36158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
@@ -417,6 +418,36 @@ public class CheckpointCoordinator {
return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
}
+ /**
+ * Test method to trigger a checkpoint/savepoint.
+ *
+ * @param timestamp The timestamp for the checkpoint.
+ * @param options The checkpoint options.
+ * @return A future to the completed checkpoint
+ */
+ @VisibleForTesting
+ @Internal
+ public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
+ switch (options.getCheckpointType()) {
+ case SAVEPOINT:
+ return triggerSavepoint(timestamp, options.getTargetLocation());
+
+ case FULL_CHECKPOINT:
+ CheckpointTriggerResult triggerResult =
+ triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false);
+
+ if (triggerResult.isSuccess()) {
+ return triggerResult.getPendingCheckpoint().getCompletionFuture();
+ } else {
+ Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
+ return FlinkCompletableFuture.completedExceptionally(cause);
+ }
+
+ default:
+ throw new IllegalArgumentException("Unknown checkpoint type: " + options.getCheckpointType());
+ }
+ }
+
@VisibleForTesting
CheckpointTriggerResult triggerCheckpoint(
long timestamp,
@@ -1092,6 +1123,7 @@ public class CheckpointCoordinator {
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
job, tasks, savepointPath, userClassLoader, allowNonRestored);
+ savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpointStore.addCheckpoint(savepoint);
// Reset the checkpoint ID counter
@@ -1099,7 +1131,7 @@ public class CheckpointCoordinator {
checkpointIdCounter.setCount(nextCheckpointId);
LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
-
+
return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index da0022c..c8d695f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -45,6 +45,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* (De)serializer for checkpoint metadata format version 2.
@@ -320,7 +321,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
- dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier());
+ dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier()));
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
@@ -380,7 +381,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
long checkpointId = dis.readLong();
- String operatorId = dis.readUTF();
+ String backendId = dis.readUTF();
int startKeyGroup = dis.readInt();
int numKeyGroups = dis.readInt();
KeyGroupRange keyGroupRange =
@@ -391,7 +392,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
return new IncrementalKeyedStateHandle(
- operatorId,
+ UUID.fromString(backendId),
keyGroupRange,
checkpointId,
sharedStates,
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 770b5a9..0085890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.state;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
+import java.util.UUID;
/**
* The handle to states of an incremental snapshot.
@@ -57,9 +59,10 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private static final long serialVersionUID = -8328808513197388231L;
/**
- * The operator instance identifier for this handle
+ * UUID to identify the backend which created this state handle. This is in creating the key for the
+ * {@link SharedStateRegistry}.
*/
- private final String operatorIdentifier;
+ private final UUID backendIdentifier;
/**
* The key-group range covered by this state handle
@@ -97,14 +100,14 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private transient SharedStateRegistry sharedStateRegistry;
public IncrementalKeyedStateHandle(
- String operatorIdentifier,
+ UUID backendIdentifier,
KeyGroupRange keyGroupRange,
long checkpointId,
Map<StateHandleID, StreamStateHandle> sharedState,
Map<StateHandleID, StreamStateHandle> privateState,
StreamStateHandle metaStateHandle) {
- this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
+ this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.checkpointId = checkpointId;
this.sharedState = Preconditions.checkNotNull(sharedState);
@@ -134,8 +137,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
return metaStateHandle;
}
- public String getOperatorIdentifier() {
- return operatorIdentifier;
+ public UUID getBackendIdentifier() {
+ return backendIdentifier;
}
@Override
@@ -231,7 +234,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
*/
@VisibleForTesting
public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
- return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
+ return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
}
/**
@@ -252,7 +255,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
if (getCheckpointId() != that.getCheckpointId()) {
return false;
}
- if (!getOperatorIdentifier().equals(that.getOperatorIdentifier())) {
+ if (!getBackendIdentifier().equals(that.getBackendIdentifier())) {
return false;
}
if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
@@ -273,7 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@VisibleForTesting
@Override
public int hashCode() {
- int result = getOperatorIdentifier().hashCode();
+ int result = getBackendIdentifier().hashCode();
result = 31 * result + getKeyGroupRange().hashCode();
result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
result = 31 * result + getSharedState().hashCode();
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index af9ac9d..e0ca873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.util.Preconditions;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +48,7 @@ public class SharedStateRegistry {
/** Executor for async state deletion */
private final Executor asyncDisposalExecutor;
+ /** Default uses direct executor to delete unreferenced state */
public SharedStateRegistry() {
this(Executors.directExecutor());
}
@@ -83,11 +85,16 @@ public class SharedStateRegistry {
entry = registeredStates.get(registrationKey);
if (entry == null) {
+
+ // Additional check that should never fail, because only state handles that are not placeholders should
+ // ever be inserted to the registry.
+ Preconditions.checkState(!isPlaceholder(state), "Attempt to reference unknown state: " + registrationKey);
+
entry = new SharedStateRegistry.SharedStateEntry(state);
registeredStates.put(registrationKey, entry);
} else {
// delete if this is a real duplicate
- if (!Objects.equals(state, entry.state)) {
+ if (!Objects.equals(state, entry.stateHandle)) {
scheduledStateDeletion = state;
}
entry.increaseReferenceCount();
@@ -95,6 +102,7 @@ public class SharedStateRegistry {
}
scheduleAsyncDelete(scheduledStateDeletion);
+ LOG.trace("Registered shared state {} under key {}.", entry, registrationKey);
return new Result(entry);
}
@@ -112,9 +120,10 @@ public class SharedStateRegistry {
final Result result;
final StreamStateHandle scheduledStateDeletion;
+ SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
- SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey);
+ entry = registeredStates.get(registrationKey);
Preconditions.checkState(entry != null,
"Cannot unregister a state that is not registered.");
@@ -124,7 +133,7 @@ public class SharedStateRegistry {
// Remove the state from the registry when it's not referenced any more.
if (entry.getReferenceCount() <= 0) {
registeredStates.remove(registrationKey);
- scheduledStateDeletion = entry.getState();
+ scheduledStateDeletion = entry.getStateHandle();
result = new Result(null, 0);
} else {
scheduledStateDeletion = null;
@@ -132,6 +141,7 @@ public class SharedStateRegistry {
}
}
+ LOG.trace("Unregistered shared state {} under key {}.", entry, registrationKey);
scheduleAsyncDelete(scheduledStateDeletion);
return result;
}
@@ -142,6 +152,7 @@ public class SharedStateRegistry {
* @param stateHandles The shared states to register.
*/
public void registerAll(Iterable<? extends CompositeStateHandle> stateHandles) {
+
if (stateHandles == null) {
return;
}
@@ -156,6 +167,8 @@ public class SharedStateRegistry {
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
+
+ LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
asyncDisposalExecutor.execute(
new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
}
@@ -171,18 +184,18 @@ public class SharedStateRegistry {
private static class SharedStateEntry {
/** The shared state handle */
- private final StreamStateHandle state;
+ private final StreamStateHandle stateHandle;
/** The current reference count of the state handle */
private int referenceCount;
SharedStateEntry(StreamStateHandle value) {
- this.state = value;
+ this.stateHandle = value;
this.referenceCount = 1;
}
- StreamStateHandle getState() {
- return state;
+ StreamStateHandle getStateHandle() {
+ return stateHandle;
}
int getReferenceCount() {
@@ -196,6 +209,14 @@ public class SharedStateRegistry {
void decreaseReferenceCount() {
--referenceCount;
}
+
+ @Override
+ public String toString() {
+ return "SharedStateEntry{" +
+ "stateHandle=" + stateHandle +
+ ", referenceCount=" + referenceCount +
+ '}';
+ }
}
/**
@@ -210,7 +231,7 @@ public class SharedStateRegistry {
private final int referenceCount;
private Result(SharedStateEntry sharedStateEntry) {
- this.reference = sharedStateEntry.getState();
+ this.reference = sharedStateEntry.getStateHandle();
this.referenceCount = sharedStateEntry.getReferenceCount();
}
@@ -228,6 +249,14 @@ public class SharedStateRegistry {
public int getReferenceCount() {
return referenceCount;
}
+
+ @Override
+ public String toString() {
+ return "Result{" +
+ "reference=" + reference +
+ ", referenceCount=" + referenceCount +
+ '}';
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index b5854dd..91bab85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,12 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -31,6 +25,13 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
@@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@@ -160,9 +162,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
stateStorage,
Executors.directExecutor());
- SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
+ verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+ verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+
CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
// check that we return the latest retrievable checkpoint
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index f985573..de1f599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -274,7 +274,7 @@ public class CheckpointTestUtils {
public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
return new IncrementalKeyedStateHandle(
- createRandomUUID(rnd).toString(),
+ createRandomUUID(rnd),
new KeyGroupRange(1, 1),
42L,
createRandomStateHandleMap(rnd),
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index 2a6975a..c1b3ccd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
import java.util.Map;
import java.util.Random;
+import java.util.UUID;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@@ -187,7 +188,7 @@ public class IncrementalKeyedStateHandleTest {
private static IncrementalKeyedStateHandle create(Random rnd) {
return new IncrementalKeyedStateHandle(
- "test",
+ UUID.nameUUIDFromBytes("test".getBytes()),
KeyGroupRange.of(0, 0),
1L,
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9e0a6e1..0e3eae5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,10 +26,10 @@ import akka.pattern.Patterns._
import akka.pattern.ask
import akka.testkit.CallingThreadDispatcher
import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, JobManagerOptions}
import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory}
import org.apache.flink.runtime.clusterframework.FlinkResourceManager
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -39,11 +39,12 @@ import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager}
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages
import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.metrics.MetricRegistry
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{CheckpointRequest, CheckpointRequestFailure, CheckpointRequestSuccess, ResponseSavepoint}
import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
import org.apache.flink.runtime.testutils.TestingResourceManager
@@ -372,6 +373,48 @@ class TestingCluster(
case _ => throw new IOException("Dispose savepoint failed")
}
}
+
+ @throws(classOf[IOException])
+ def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = {
+ val jobManagerGateway = getLeaderGateway(timeout)
+
+ // wait until the cluster is ready to take a checkpoint.
+ val allRunning = jobManagerGateway.ask(
+ TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout)
+
+ Await.ready(allRunning, timeout)
+
+ // trigger checkpoint
+ val result = Await.result(
+ jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout)
+
+ result match {
+ case success: CheckpointRequestSuccess => success.path
+ case fail: CheckpointRequestFailure => {
+ if (fail.cause.getMessage.contains("tasks not ready")) {
+ // retry if the tasks are not ready yet.
+ Thread.sleep(50)
+ requestCheckpoint(jobId, options)
+ } else {
+ throw new IOException(fail.cause)
+ }
+ }
+ case _ => throw new IllegalStateException("Trigger checkpoint failed")
+ }
+ }
+
+ @throws[Exception]
+ def cancelJob(jobId: JobID): Unit = {
+ if (getCurrentlyRunningJobsJava.contains(jobId)) {
+ val jobManagerGateway = getLeaderGateway(timeout)
+ val cancelFuture = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout)
+ val result = Await.result(cancelFuture, timeout)
+ if (!result.isInstanceOf[JobManagerMessages.CancellationSuccess]) {
+ throw new Exception("Cancellation failed")
+ }
+ }
+ else throw new IllegalStateException("Job is not running")
+ }
}
object TestingCluster {
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index a3d31f5..3d3af95 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,14 +22,17 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
import akka.pattern.{ask, pipe}
import org.apache.flink.api.common.JobID
import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.concurrent.BiFunction
import org.apache.flink.runtime.execution.ExecutionState
import org.apache.flink.runtime.jobgraph.JobStatus
import org.apache.flink.runtime.jobmanager.JobManager
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
import org.apache.flink.runtime.messages.Acknowledge
import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.JobManagerMessages._
import org.apache.flink.runtime.messages.Messages.Disconnect
import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,6 +339,60 @@ trait TestingJobManagerLike extends FlinkActor {
}
}
+ case CheckpointRequest(jobId, checkpointOptions) =>
+ currentJobs.get(jobId) match {
+ case Some((graph, _)) =>
+ val checkpointCoordinator = graph.getCheckpointCoordinator()
+
+ if (checkpointCoordinator != null) {
+ // Immutable copy for the future
+ val senderRef = sender()
+ try {
+ // Do this async, because checkpoint coordinator operations can
+ // contain blocking calls to the state backend or ZooKeeper.
+ val cpFuture = checkpointCoordinator.triggerCheckpoint(
+ System.currentTimeMillis(),
+ checkpointOptions)
+
+ cpFuture.handleAsync[Void](
+ new BiFunction[CompletedCheckpoint, Throwable, Void] {
+ override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+ if (success != null) {
+ if (success.getExternalPointer == null &&
+ CheckpointType.SAVEPOINT.equals(checkpointOptions.getCheckpointType)) {
+ senderRef ! CheckpointRequestFailure(
+ jobId, new Exception("Savepoint has not been persisted.")
+ )
+ } else {
+ senderRef ! CheckpointRequestSuccess(
+ jobId,
+ success.getCheckpointID,
+ success.getExternalPointer,
+ success.getTimestamp)
+ }
+ } else {
+ senderRef ! CheckpointRequestFailure(
+ jobId, new Exception("Failed to complete checkpoint", cause))
+ }
+ null
+ }
+ },
+ context.dispatcher)
+ } catch {
+ case e: Exception =>
+ senderRef ! CheckpointRequestFailure(jobId, new Exception(
+ "Failed to trigger checkpoint", e))
+ }
+ } else {
+ sender() ! CheckpointRequestFailure(jobId, new IllegalStateException(
+ "Checkpointing disabled. You can enable it via the execution environment of " +
+ "your job."))
+ }
+
+ case None =>
+ sender() ! CheckpointRequestFailure(jobId, new IllegalArgumentException("Unknown job."))
+ }
+
case NotifyWhenLeader =>
if (leaderElectionService.hasLeadership) {
sender() ! true
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 48c2bfb..f79c124 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -23,10 +23,13 @@ import java.util.Map
import akka.actor.ActorRef
import org.apache.flink.api.common.JobID
import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.checkpoint.CheckpointOptions
import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph
import org.apache.flink.runtime.instance.ActorGateway
import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
object TestingJobManagerMessages {
@@ -59,6 +62,40 @@ object TestingJobManagerMessages {
case class TaskManagerTerminated(taskManager: ActorRef)
/**
+ * Triggers a checkpoint for the specified job.
+ *
+ * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
+ * control-flow message, which is *not* part of the checkpointing mechanism
+ * of triggering and acknowledging checkpoints.
+ *
+ * @param jobId The JobID of the job to trigger the savepoint for.
+ * @param options properties of the checkpoint
+ */
+ case class CheckpointRequest(
+ jobId: JobID,
+ options: CheckpointOptions) extends RequiresLeaderSessionID
+
+ /**
+ * Response after a successful checkpoint trigger containing the savepoint path.
+ *
+ * @param jobId The job ID for which the savepoint was triggered.
+ * @param path The path of the savepoint.
+ */
+ case class CheckpointRequestSuccess(
+ jobId: JobID,
+ checkpointId: Long,
+ path: String,
+ triggerTime: Long)
+
+ /**
+ * Response after a failed checkpoint trigger containing the failure cause.
+ *
+ * @param jobId The job ID for which the savepoint was triggered.
+ * @param cause The cause of the failure.
+ */
+ case class CheckpointRequestFailure(jobId: JobID, cause: Throwable)
+
+ /**
* Registers a listener to receive a message when accumulators changed.
* The change must be explicitly triggered by the TestingTaskManager which can receive an
* [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]
http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
new file mode 100644
index 0000000..e341741
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.test.state.ManualWindowSpeedITCase;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * IT case for externalized checkpoints with {@link org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore}
+ * and {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ *
+ * <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964.
+ */
+public class ExternalizedCheckpointITCase {
+
+ private static final int PARALLELISM = 2;
+ private static final int NUM_TASK_MANAGERS = 2;
+ private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+ @ClassRule
+ public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Test
+ public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ null,
+ new RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+ }
+
+ @Test
+ public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ null,
+ new RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+ }
+
+ @Test
+ public void testExternalizedFSCheckpointsStandalone() throws Exception {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ null,
+ new FsStateBackend(checkpointDir.toURI().toString(), true));
+
+ }
+
+ @Test
+ public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
+ TestingServer zkServer = new TestingServer();
+ zkServer.start();
+ try {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zkServer.getConnectString(),
+ new RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+ } finally {
+ zkServer.stop();
+ }
+ }
+
+ @Test
+ public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
+ TestingServer zkServer = new TestingServer();
+ zkServer.start();
+ try {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zkServer.getConnectString(),
+ new RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+ } finally {
+ zkServer.stop();
+ }
+ }
+
+ @Test
+ public void testExternalizedFSCheckpointsZookeeper() throws Exception {
+ TestingServer zkServer = new TestingServer();
+ zkServer.start();
+ try {
+ final File checkpointDir = temporaryFolder.newFolder();
+ testExternalizedCheckpoints(
+ checkpointDir,
+ zkServer.getConnectString(),
+ new FsStateBackend(checkpointDir.toURI().toString(), true));
+ } finally {
+ zkServer.stop();
+ }
+ }
+
+ private void testExternalizedCheckpoints(
+ File checkpointDir,
+ String zooKeeperQuorum,
+ AbstractStateBackend backend) throws Exception {
+
+ final Configuration config = new Configuration();
+
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+
+ final File savepointDir = temporaryFolder.newFolder();
+
+ config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+ config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+ config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+
+ // ZooKeeper recovery mode?
+ if (zooKeeperQuorum != null) {
+ final File haDir = temporaryFolder.newFolder();
+ config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+ }
+
+ TestingCluster cluster = new TestingCluster(config);
+ cluster.start();
+
+ String externalCheckpoint = null;
+
+ try {
+
+ // main test sequence: start job -> eCP -> restore job -> eCP -> restore job -> eCP
+ for (int i = 0; i < 3; ++i) {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ env.setStateBackend(backend);
+ env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+ env.setParallelism(PARALLELISM);
+
+ env.addSource(new ManualWindowSpeedITCase.InfiniteTupleSource(10_000))
+ .keyBy(0)
+ .timeWindow(Time.seconds(3))
+ .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Tuple2<String, Integer> reduce(
+ Tuple2<String, Integer> value1,
+ Tuple2<String, Integer> value2) throws Exception {
+ return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+ }
+ })
+ .filter(new FilterFunction<Tuple2<String, Integer>>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public boolean filter(Tuple2<String, Integer> value) throws Exception {
+ return value.f0.startsWith("Tuple 0");
+ }
+ });
+
+ StreamGraph streamGraph = env.getStreamGraph();
+ streamGraph.setJobName("Test");
+
+ JobGraph jobGraph = streamGraph.getJobGraph();
+
+ // recover from previous iteration?
+ if (externalCheckpoint != null) {
+ jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+ }
+
+ config.addAll(jobGraph.getJobConfiguration());
+ JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+
+ // let the job do some progress
+ Thread.sleep(200);
+
+ externalCheckpoint =
+ cluster.requestCheckpoint(submissionResult.getJobID(), CheckpointOptions.forFullCheckpoint());
+
+ cluster.cancelJob(submissionResult.getJobID());
+ }
+ } finally {
+ cluster.stop();
+ cluster.awaitTermination();
+ }
+ }
+}