You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/14 15:17:06 UTC

flink git commit: [FLINK-6964] [checkpoint] Fix externalized incremental checkpoints for StandaloneCompletedCheckpointStore

Repository: flink
Updated Branches:
  refs/heads/master 94d3166b4 -> 8cff17fcc


[FLINK-6964] [checkpoint] Fix externalized incremental checkpoints for StandaloneCompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cff17fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cff17fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cff17fc

Branch: refs/heads/master
Commit: 8cff17fcc9b4bca6499c26fc2a6318c759cbf251
Parents: 94d3166
Author: Stefan Richter <s....@data-artisans.com>
Authored: Wed Jun 21 11:22:28 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 14 15:52:09 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  12 +-
 .../checkpoint/CheckpointCoordinator.java       |  34 ++-
 .../savepoint/SavepointV2Serializer.java        |   7 +-
 .../state/IncrementalKeyedStateHandle.java      |  21 +-
 .../runtime/state/SharedStateRegistry.java      |  45 +++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  19 +-
 .../savepoint/CheckpointTestUtils.java          |   2 +-
 .../state/IncrementalKeyedStateHandleTest.java  |   3 +-
 .../runtime/testingUtils/TestingCluster.scala   |  49 +++-
 .../testingUtils/TestingJobManagerLike.scala    |  59 ++++-
 .../TestingJobManagerMessages.scala             |  39 +++-
 .../ExternalizedCheckpointITCase.java           | 228 +++++++++++++++++++
 12 files changed, 482 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 291973c..9d289b4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -185,6 +185,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The identifier of the last completed checkpoint. */
 	private long lastCompletedCheckpointId = -1;
 
+	/** Unique ID of this backend. */
+	private UUID backendUID;
+
 	private static final String SST_FILE_SUFFIX = ".sst";
 
 	public RocksDBKeyedStateBackend(
@@ -233,6 +236,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.kvStateInformation = new HashMap<>();
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
+		this.backendUID = UUID.randomUUID();
+		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
 	}
 
 	/**
@@ -926,7 +931,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 
 			return new IncrementalKeyedStateHandle(
-				stateBackend.operatorIdentifier,
+				stateBackend.backendUID,
 				stateBackend.keyGroupRange,
 				checkpointId,
 				sstFiles,
@@ -1438,6 +1443,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 						}
 					}
 				} else {
+					// pick up again the old backend id, so the we can reference existing state
+					stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
+
+					LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
+						stateBackend.operatorIdentifier, stateBackend.backendUID);
 
 					// create hard links in the instance directory
 					if (!stateBackend.instanceRocksDBPath.mkdirs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 905d132..3e36158 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -417,6 +418,36 @@ public class CheckpointCoordinator {
 		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
 	}
 
+	/**
+	 * Test method to trigger a checkpoint/savepoint.
+	 *
+	 * @param timestamp The timestamp for the checkpoint.
+	 * @param options The checkpoint options.
+	 * @return A future to the completed checkpoint
+	 */
+	@VisibleForTesting
+	@Internal
+	public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, CheckpointOptions options) throws Exception {
+		switch (options.getCheckpointType()) {
+			case SAVEPOINT:
+				return triggerSavepoint(timestamp, options.getTargetLocation());
+
+			case FULL_CHECKPOINT:
+				CheckpointTriggerResult triggerResult =
+					triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, false);
+
+				if (triggerResult.isSuccess()) {
+					return triggerResult.getPendingCheckpoint().getCompletionFuture();
+				} else {
+					Throwable cause = new Exception("Failed to trigger checkpoint: " + triggerResult.getFailureReason().message());
+					return FlinkCompletableFuture.completedExceptionally(cause);
+				}
+
+			default:
+				throw new IllegalArgumentException("Unknown checkpoint type: " + options.getCheckpointType());
+		}
+	}
+
 	@VisibleForTesting
 	CheckpointTriggerResult triggerCheckpoint(
 			long timestamp,
@@ -1092,6 +1123,7 @@ public class CheckpointCoordinator {
 		CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
 				job, tasks, savepointPath, userClassLoader, allowNonRestored);
 
+		savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
 		completedCheckpointStore.addCheckpoint(savepoint);
 		
 		// Reset the checkpoint ID counter
@@ -1099,7 +1131,7 @@ public class CheckpointCoordinator {
 		checkpointIdCounter.setCount(nextCheckpointId);
 		
 		LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
-		
+
 		return restoreLatestCheckpointedState(tasks, true, allowNonRestored);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index da0022c..c8d695f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * (De)serializer for checkpoint metadata format version 2.
@@ -320,7 +321,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
 
 			dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
-			dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier());
+			dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier()));
 			dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
 			dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
 
@@ -380,7 +381,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 		} else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
 			long checkpointId = dis.readLong();
-			String operatorId = dis.readUTF();
+			String backendId = dis.readUTF();
 			int startKeyGroup = dis.readInt();
 			int numKeyGroups = dis.readInt();
 			KeyGroupRange keyGroupRange =
@@ -391,7 +392,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
 			Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
 
 			return new IncrementalKeyedStateHandle(
-				operatorId,
+				UUID.fromString(backendId),
 				keyGroupRange,
 				checkpointId,
 				sharedStates,

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 770b5a9..0085890 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * The handle to states of an incremental snapshot.
@@ -57,9 +59,10 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private static final long serialVersionUID = -8328808513197388231L;
 
 	/**
-	 * The operator instance identifier for this handle
+	 * UUID to identify the backend which created this state handle. This is in creating the key for the
+	 * {@link SharedStateRegistry}.
 	 */
-	private final String operatorIdentifier;
+	private final UUID backendIdentifier;
 
 	/**
 	 * The key-group range covered by this state handle
@@ -97,14 +100,14 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	private transient SharedStateRegistry sharedStateRegistry;
 
 	public IncrementalKeyedStateHandle(
-		String operatorIdentifier,
+		UUID backendIdentifier,
 		KeyGroupRange keyGroupRange,
 		long checkpointId,
 		Map<StateHandleID, StreamStateHandle> sharedState,
 		Map<StateHandleID, StreamStateHandle> privateState,
 		StreamStateHandle metaStateHandle) {
 
-		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
+		this.backendIdentifier = Preconditions.checkNotNull(backendIdentifier);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.checkpointId = checkpointId;
 		this.sharedState = Preconditions.checkNotNull(sharedState);
@@ -134,8 +137,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		return metaStateHandle;
 	}
 
-	public String getOperatorIdentifier() {
-		return operatorIdentifier;
+	public UUID getBackendIdentifier() {
+		return backendIdentifier;
 	}
 
 	@Override
@@ -231,7 +234,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	 */
 	@VisibleForTesting
 	public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
-		return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
+		return new SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, shId);
 	}
 
 	/**
@@ -252,7 +255,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 		if (getCheckpointId() != that.getCheckpointId()) {
 			return false;
 		}
-		if (!getOperatorIdentifier().equals(that.getOperatorIdentifier())) {
+		if (!getBackendIdentifier().equals(that.getBackendIdentifier())) {
 			return false;
 		}
 		if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
@@ -273,7 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
 	@VisibleForTesting
 	@Override
 	public int hashCode() {
-		int result = getOperatorIdentifier().hashCode();
+		int result = getBackendIdentifier().hashCode();
 		result = 31 * result + getKeyGroupRange().hashCode();
 		result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
 		result = 31 * result + getSharedState().hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index af9ac9d..e0ca873 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ public class SharedStateRegistry {
 	/** Executor for async state deletion */
 	private final Executor asyncDisposalExecutor;
 
+	/** Default uses direct executor to delete unreferenced state */
 	public SharedStateRegistry() {
 		this(Executors.directExecutor());
 	}
@@ -83,11 +85,16 @@ public class SharedStateRegistry {
 			entry = registeredStates.get(registrationKey);
 
 			if (entry == null) {
+
+				// Additional check that should never fail, because only state handles that are not placeholders should
+				// ever be inserted to the registry.
+				Preconditions.checkState(!isPlaceholder(state), "Attempt to reference unknown state: " + registrationKey);
+
 				entry = new SharedStateRegistry.SharedStateEntry(state);
 				registeredStates.put(registrationKey, entry);
 			} else {
 				// delete if this is a real duplicate
-				if (!Objects.equals(state, entry.state)) {
+				if (!Objects.equals(state, entry.stateHandle)) {
 					scheduledStateDeletion = state;
 				}
 				entry.increaseReferenceCount();
@@ -95,6 +102,7 @@ public class SharedStateRegistry {
 		}
 
 		scheduleAsyncDelete(scheduledStateDeletion);
+		LOG.trace("Registered shared state {} under key {}.", entry, registrationKey);
 		return new Result(entry);
 	}
 
@@ -112,9 +120,10 @@ public class SharedStateRegistry {
 
 		final Result result;
 		final StreamStateHandle scheduledStateDeletion;
+		SharedStateRegistry.SharedStateEntry entry;
 
 		synchronized (registeredStates) {
-			SharedStateRegistry.SharedStateEntry entry = registeredStates.get(registrationKey);
+			entry = registeredStates.get(registrationKey);
 
 			Preconditions.checkState(entry != null,
 				"Cannot unregister a state that is not registered.");
@@ -124,7 +133,7 @@ public class SharedStateRegistry {
 			// Remove the state from the registry when it's not referenced any more.
 			if (entry.getReferenceCount() <= 0) {
 				registeredStates.remove(registrationKey);
-				scheduledStateDeletion = entry.getState();
+				scheduledStateDeletion = entry.getStateHandle();
 				result = new Result(null, 0);
 			} else {
 				scheduledStateDeletion = null;
@@ -132,6 +141,7 @@ public class SharedStateRegistry {
 			}
 		}
 
+		LOG.trace("Unregistered shared state {} under key {}.", entry, registrationKey);
 		scheduleAsyncDelete(scheduledStateDeletion);
 		return result;
 	}
@@ -142,6 +152,7 @@ public class SharedStateRegistry {
 	 * @param stateHandles The shared states to register.
 	 */
 	public void registerAll(Iterable<? extends CompositeStateHandle> stateHandles) {
+
 		if (stateHandles == null) {
 			return;
 		}
@@ -156,6 +167,8 @@ public class SharedStateRegistry {
 	private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
 		// We do the small optimization to not issue discards for placeholders, which are NOPs.
 		if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
+
+			LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
 			asyncDisposalExecutor.execute(
 				new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
 		}
@@ -171,18 +184,18 @@ public class SharedStateRegistry {
 	private static class SharedStateEntry {
 
 		/** The shared state handle */
-		private final StreamStateHandle state;
+		private final StreamStateHandle stateHandle;
 
 		/** The current reference count of the state handle */
 		private int referenceCount;
 
 		SharedStateEntry(StreamStateHandle value) {
-			this.state = value;
+			this.stateHandle = value;
 			this.referenceCount = 1;
 		}
 
-		StreamStateHandle getState() {
-			return state;
+		StreamStateHandle getStateHandle() {
+			return stateHandle;
 		}
 
 		int getReferenceCount() {
@@ -196,6 +209,14 @@ public class SharedStateRegistry {
 		void decreaseReferenceCount() {
 			--referenceCount;
 		}
+
+		@Override
+		public String toString() {
+			return "SharedStateEntry{" +
+				"stateHandle=" + stateHandle +
+				", referenceCount=" + referenceCount +
+				'}';
+		}
 	}
 
 	/**
@@ -210,7 +231,7 @@ public class SharedStateRegistry {
 		private final int referenceCount;
 
 		private Result(SharedStateEntry sharedStateEntry) {
-			this.reference = sharedStateEntry.getState();
+			this.reference = sharedStateEntry.getStateHandle();
 			this.referenceCount = sharedStateEntry.getReferenceCount();
 		}
 
@@ -228,6 +249,14 @@ public class SharedStateRegistry {
 		public int getReferenceCount() {
 			return referenceCount;
 		}
+
+		@Override
+		public String toString() {
+			return "Result{" +
+				"reference=" + reference +
+				", referenceCount=" + referenceCount +
+				'}';
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index b5854dd..91bab85 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -31,6 +25,13 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -160,9 +162,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 			stateStorage,
 			Executors.directExecutor());
 
-		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+		SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
 		zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
 
+		verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+		verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+
 		CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 
 		// check that we return the latest retrievable checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index f985573..de1f599 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -274,7 +274,7 @@ public class CheckpointTestUtils {
 
 	public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
 		return new IncrementalKeyedStateHandle(
-			createRandomUUID(rnd).toString(),
+			createRandomUUID(rnd),
 			new KeyGroupRange(1, 1),
 			42L,
 			createRandomStateHandleMap(rnd),

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index 2a6975a..c1b3ccd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -187,7 +188,7 @@ public class IncrementalKeyedStateHandleTest {
 
 	private static IncrementalKeyedStateHandle create(Random rnd) {
 		return new IncrementalKeyedStateHandle(
-			"test",
+			UUID.nameUUIDFromBytes("test".getBytes()),
 			KeyGroupRange.of(0, 0),
 			1L,
 			placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9e0a6e1..0e3eae5 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,10 +26,10 @@ import akka.pattern.Patterns._
 import akka.pattern.ask
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{ConfigConstants, Configuration, JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.checkpoint.{CheckpointOptions, CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -39,11 +39,12 @@ import org.apache.flink.runtime.instance.{ActorGateway, InstanceManager}
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{CheckpointRequest, CheckpointRequestFailure, CheckpointRequestSuccess, ResponseSavepoint}
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
@@ -372,6 +373,48 @@ class TestingCluster(
       case _ => throw new IOException("Dispose savepoint failed")
     }
   }
+
+  @throws(classOf[IOException])
+  def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = {
+    val jobManagerGateway = getLeaderGateway(timeout)
+
+    // wait until the cluster is ready to take a checkpoint.
+    val allRunning = jobManagerGateway.ask(
+      TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout)
+
+    Await.ready(allRunning, timeout)
+
+    // trigger checkpoint
+    val result = Await.result(
+      jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout)
+
+    result match {
+      case success: CheckpointRequestSuccess => success.path
+      case fail: CheckpointRequestFailure => {
+        if (fail.cause.getMessage.contains("tasks not ready")) {
+          // retry if the tasks are not ready yet.
+          Thread.sleep(50)
+          requestCheckpoint(jobId, options)
+        } else {
+          throw new IOException(fail.cause)
+        }
+      }
+      case _ => throw new IllegalStateException("Trigger checkpoint failed")
+    }
+  }
+
+  @throws[Exception]
+  def cancelJob(jobId: JobID): Unit = {
+    if (getCurrentlyRunningJobsJava.contains(jobId)) {
+      val jobManagerGateway = getLeaderGateway(timeout)
+      val cancelFuture = jobManagerGateway.ask(new JobManagerMessages.CancelJob(jobId), timeout)
+      val result = Await.result(cancelFuture, timeout)
+      if (!result.isInstanceOf[JobManagerMessages.CancellationSuccess]) {
+        throw new Exception("Cancellation failed")
+      }
+    }
+    else throw new IllegalStateException("Job is not running")
+  }
  }
 
 object TestingCluster {

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index a3d31f5..3d3af95 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,14 +22,17 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.concurrent.BiFunction
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.messages.Acknowledge
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,6 +339,60 @@ trait TestingJobManagerLike extends FlinkActor {
         }
       }
 
+    case CheckpointRequest(jobId, checkpointOptions) =>
+      currentJobs.get(jobId) match {
+        case Some((graph, _)) =>
+          val checkpointCoordinator = graph.getCheckpointCoordinator()
+
+          if (checkpointCoordinator != null) {
+            // Immutable copy for the future
+            val senderRef = sender()
+            try {
+              // Do this async, because checkpoint coordinator operations can
+              // contain blocking calls to the state backend or ZooKeeper.
+              val cpFuture = checkpointCoordinator.triggerCheckpoint(
+                System.currentTimeMillis(),
+                checkpointOptions)
+
+              cpFuture.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+                    if (success != null) {
+                      if (success.getExternalPointer == null &&
+                        CheckpointType.SAVEPOINT.equals(checkpointOptions.getCheckpointType)) {
+                        senderRef ! CheckpointRequestFailure(
+                          jobId, new Exception("Savepoint has not been persisted.")
+                        )
+                      } else {
+                        senderRef ! CheckpointRequestSuccess(
+                          jobId,
+                          success.getCheckpointID,
+                          success.getExternalPointer,
+                          success.getTimestamp)
+                      }
+                    } else {
+                      senderRef ! CheckpointRequestFailure(
+                        jobId, new Exception("Failed to complete checkpoint", cause))
+                    }
+                    null
+                  }
+                },
+                context.dispatcher)
+            } catch {
+              case e: Exception =>
+                senderRef ! CheckpointRequestFailure(jobId, new Exception(
+                  "Failed to trigger checkpoint", e))
+            }
+          } else {
+            sender() ! CheckpointRequestFailure(jobId, new IllegalStateException(
+              "Checkpointing disabled. You can enable it via the execution environment of " +
+                "your job."))
+          }
+
+        case None =>
+          sender() ! CheckpointRequestFailure(jobId, new IllegalArgumentException("Unknown job."))
+      }
+
     case NotifyWhenLeader =>
       if (leaderElectionService.hasLeadership) {
         sender() ! true

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 48c2bfb..f79c124 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -23,10 +23,13 @@ import java.util.Map
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.checkpoint.CheckpointOptions
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
 
 object TestingJobManagerMessages {
 
@@ -59,6 +62,40 @@ object TestingJobManagerMessages {
   case class TaskManagerTerminated(taskManager: ActorRef)
 
   /**
+    * Triggers a checkpoint for the specified job.
+    *
+    * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
+    * control-flow message, which is *not* part of the checkpointing mechanism
+    * of triggering and acknowledging checkpoints.
+    *
+    * @param jobId The JobID of the job to trigger the savepoint for.
+    * @param options properties of the checkpoint
+    */
+  case class CheckpointRequest(
+    jobId: JobID,
+    options: CheckpointOptions) extends RequiresLeaderSessionID
+
+  /**
+    * Response after a successful checkpoint trigger containing the savepoint path.
+    *
+    * @param jobId The job ID for which the savepoint was triggered.
+    * @param path  The path of the savepoint.
+    */
+  case class CheckpointRequestSuccess(
+    jobId: JobID,
+    checkpointId: Long,
+    path: String,
+    triggerTime: Long)
+
+  /**
+    * Response after a failed checkpoint trigger containing the failure cause.
+    *
+    * @param jobId The job ID for which the savepoint was triggered.
+    * @param cause The cause of the failure.
+    */
+  case class CheckpointRequestFailure(jobId: JobID, cause: Throwable)
+
+  /**
    * Registers a listener to receive a message when accumulators changed.
    * The change must be explicitly triggered by the TestingTaskManager which can receive an
    * [[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
new file mode 100644
index 0000000..e341741
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.test.state.ManualWindowSpeedITCase;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * IT case for externalized checkpoints with {@link org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore}
+ * and {@link org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ *
+ * <p>This tests considers full and incremental checkpoints and was introduced to guard against problems like FLINK-6964.
+ */
+public class ExternalizedCheckpointITCase {
+
+	private static final int PARALLELISM = 2;
+	private static final int NUM_TASK_MANAGERS = 2;
+	private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+	@ClassRule
+	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Test
+	public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
+		final File checkpointDir = temporaryFolder.newFolder();
+		testExternalizedCheckpoints(
+			checkpointDir,
+			null,
+			new RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+	}
+
+	@Test
+	public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
+		final File checkpointDir = temporaryFolder.newFolder();
+		testExternalizedCheckpoints(
+			checkpointDir,
+			null,
+			new RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+	}
+
+	@Test
+	public void testExternalizedFSCheckpointsStandalone() throws Exception {
+		final File checkpointDir = temporaryFolder.newFolder();
+		testExternalizedCheckpoints(
+			checkpointDir,
+			null,
+			new FsStateBackend(checkpointDir.toURI().toString(), true));
+
+	}
+
+	@Test
+	public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
+		TestingServer zkServer = new TestingServer();
+		zkServer.start();
+		try {
+			final File checkpointDir = temporaryFolder.newFolder();
+			testExternalizedCheckpoints(
+				checkpointDir,
+				zkServer.getConnectString(),
+				new RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+		} finally {
+			zkServer.stop();
+		}
+	}
+
+	@Test
+	public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
+		TestingServer zkServer = new TestingServer();
+		zkServer.start();
+		try {
+			final File checkpointDir = temporaryFolder.newFolder();
+			testExternalizedCheckpoints(
+				checkpointDir,
+				zkServer.getConnectString(),
+				new RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+		} finally {
+			zkServer.stop();
+		}
+	}
+
+	@Test
+	public void testExternalizedFSCheckpointsZookeeper() throws Exception {
+		TestingServer zkServer = new TestingServer();
+		zkServer.start();
+		try {
+			final File checkpointDir = temporaryFolder.newFolder();
+			testExternalizedCheckpoints(
+				checkpointDir,
+				zkServer.getConnectString(),
+				new FsStateBackend(checkpointDir.toURI().toString(), true));
+		} finally {
+			zkServer.stop();
+		}
+	}
+
+	private void testExternalizedCheckpoints(
+		File checkpointDir,
+		String zooKeeperQuorum,
+		AbstractStateBackend backend) throws Exception {
+
+		final Configuration config = new Configuration();
+
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, SLOTS_PER_TASK_MANAGER);
+
+		final File savepointDir = temporaryFolder.newFolder();
+
+		config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, checkpointDir.toURI().toString());
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
+		config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
+
+		// ZooKeeper recovery mode?
+		if (zooKeeperQuorum != null) {
+			final File haDir = temporaryFolder.newFolder();
+			config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
+		}
+
+		TestingCluster cluster = new TestingCluster(config);
+		cluster.start();
+
+		String externalCheckpoint = null;
+
+		try {
+
+			// main test sequence:  start job -> eCP -> restore job -> eCP -> restore job -> eCP
+			for (int i = 0; i < 3; ++i) {
+				final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+				env.setStateBackend(backend);
+				env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+				env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+				env.setParallelism(PARALLELISM);
+
+				env.addSource(new ManualWindowSpeedITCase.InfiniteTupleSource(10_000))
+					.keyBy(0)
+					.timeWindow(Time.seconds(3))
+					.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
+						private static final long serialVersionUID = 1L;
+
+						@Override
+						public Tuple2<String, Integer> reduce(
+							Tuple2<String, Integer> value1,
+							Tuple2<String, Integer> value2) throws Exception {
+							return Tuple2.of(value1.f0, value1.f1 + value2.f1);
+						}
+					})
+					.filter(new FilterFunction<Tuple2<String, Integer>>() {
+						private static final long serialVersionUID = 1L;
+
+						@Override
+						public boolean filter(Tuple2<String, Integer> value) throws Exception {
+							return value.f0.startsWith("Tuple 0");
+						}
+					});
+
+				StreamGraph streamGraph = env.getStreamGraph();
+				streamGraph.setJobName("Test");
+
+				JobGraph jobGraph = streamGraph.getJobGraph();
+
+				// recover from previous iteration?
+				if (externalCheckpoint != null) {
+					jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+				}
+
+				config.addAll(jobGraph.getJobConfiguration());
+				JobSubmissionResult submissionResult = cluster.submitJobDetached(jobGraph);
+
+				// let the job do some progress
+				Thread.sleep(200);
+
+				externalCheckpoint =
+					cluster.requestCheckpoint(submissionResult.getJobID(), CheckpointOptions.forFullCheckpoint());
+
+				cluster.cancelJob(submissionResult.getJobID());
+			}
+		} finally {
+			cluster.stop();
+			cluster.awaitTermination();
+		}
+	}
+}