You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/20 13:46:05 UTC
flink git commit: [FLINK-6640] Ensure registration of shared state
happens before externalizing a checkpoint
Repository: flink
Updated Branches:
refs/heads/master b93396c5a -> 040356391
[FLINK-6640] Ensure registration of shared state happens before externalizing a checkpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/04035639
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/04035639
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/04035639
Branch: refs/heads/master
Commit: 040356391f621858199948b15160b1e382152cc1
Parents: b93396c
Author: Stefan Richter <s....@data-artisans.com>
Authored: Fri May 19 16:46:06 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Sat May 20 15:45:45 2017 +0200
----------------------------------------------------------------------
.../AbstractCompletedCheckpointStore.java | 43 --------------
.../checkpoint/CheckpointCoordinator.java | 17 +++---
.../runtime/checkpoint/CompletedCheckpoint.java | 6 +-
.../checkpoint/CompletedCheckpointStore.java | 5 +-
.../runtime/checkpoint/PendingCheckpoint.java | 1 -
.../StandaloneCompletedCheckpointStore.java | 11 ++--
.../ZooKeeperCompletedCheckpointStore.java | 26 ++++-----
.../runtime/state/SharedStateRegistry.java | 16 +++++-
.../CheckpointCoordinatorFailureTest.java | 3 +-
.../CompletedCheckpointStoreTest.java | 33 +++++++----
.../checkpoint/CompletedCheckpointTest.java | 14 ++---
.../StandaloneCompletedCheckpointStoreTest.java | 21 +++----
...ZooKeeperCompletedCheckpointStoreITCase.java | 60 ++++++++++++--------
.../ZooKeeperCompletedCheckpointStoreTest.java | 4 +-
.../RecoverableCompletedCheckpointStore.java | 13 ++---
.../JobManagerHACheckpointRecoveryITCase.java | 37 +++++++-----
16 files changed, 156 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
deleted file mode 100644
index bf70501..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCompletedCheckpointStore.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.runtime.state.SharedStateRegistry;
-
-import java.util.concurrent.Executor;
-
-/**
- * This is the base class that provides implementation of some aspects common for all
- * {@link CompletedCheckpointStore}s.
- */
-public abstract class AbstractCompletedCheckpointStore implements CompletedCheckpointStore {
-
- /**
- * Registry for shared states.
- */
- protected final SharedStateRegistry sharedStateRegistry;
-
- public AbstractCompletedCheckpointStore() {
- this.sharedStateRegistry = new SharedStateRegistry();
- }
-
- public AbstractCompletedCheckpointStore(Executor asyncIOExecutor) {
- this.sharedStateRegistry = new SharedStateRegistry(asyncIOExecutor);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0bbf6b5..e224780 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,11 +40,11 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -173,6 +173,9 @@ public class CheckpointCoordinator {
@Nullable
private CheckpointStatsTracker statsTracker;
+ /** Registry that tracks state which is shared across (incremental) checkpoints */
+ private final SharedStateRegistry sharedStateRegistry;
+
// --------------------------------------------------------------------------------------------
public CheckpointCoordinator(
@@ -226,6 +229,7 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
+ this.sharedStateRegistry = new SharedStateRegistry(executor);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
@@ -836,6 +840,10 @@ public class CheckpointCoordinator {
final long checkpointId = pendingCheckpoint.getCheckpointId();
final CompletedCheckpoint completedCheckpoint;
+ // As a first step to complete the checkpoint, we register its state with the registry
+ Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
+ sharedStateRegistry.registerAll(operatorStates.values());
+
try {
try {
// externalize the checkpoint if required
@@ -1003,7 +1011,7 @@ public class CheckpointCoordinator {
}
// Recover the checkpoints
- completedCheckpointStore.recover();
+ completedCheckpointStore.recover(sharedStateRegistry);
// restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
@@ -1120,11 +1128,6 @@ public class CheckpointCoordinator {
return completedCheckpointStore;
}
-// @VisibleForTesting
-// SharedStateRegistry getSharedStateRegistry() {
-// return sharedStateRegistry;
-// }
-
public CheckpointIDCounter getCheckpointIdCounter() {
return checkpointIdCounter;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index b382080..7c3edee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -178,7 +178,7 @@ public class CompletedCheckpoint implements Serializable {
doDiscard();
}
- public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public boolean discardOnSubsume() throws Exception {
if (props.discardOnSubsumed()) {
doDiscard();
@@ -188,7 +188,7 @@ public class CompletedCheckpoint implements Serializable {
return false;
}
- public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception {
+ public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() ||
@@ -290,7 +290,7 @@ public class CompletedCheckpoint implements Serializable {
*
* @param sharedStateRegistry The registry where shared states are registered
*/
- public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
+ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) {
sharedStateRegistry.registerAll(operatorStates.values());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 82193b5..45d407e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import java.util.List;
@@ -32,8 +33,10 @@ public interface CompletedCheckpointStore {
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
+ *
+ * @param sharedStateRegistry the shared state registry to register recovered states.
*/
- void recover() throws Exception;
+ void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 370032a..0633fec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 233cfc8..fbb0198 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
/**
* {@link CompletedCheckpointStore} for JobManagers running in {@link HighAvailabilityMode#NONE}.
*/
-public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class StandaloneCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
@@ -56,21 +57,19 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
}
@Override
- public void recover() throws Exception {
+ public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
// Nothing to do
}
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
-
- checkpoint.registerSharedStates(sharedStateRegistry);
checkpoints.addLast(checkpoint);
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
- checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+ checkpointToSubsume.discardOnSubsume();
} catch (Exception e) {
LOG.warn("Fail to subsume the old checkpoint.", e);
}
@@ -103,7 +102,7 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
LOG.info("Shutting down");
for (CompletedCheckpoint checkpoint : checkpoints) {
- checkpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+ checkpoint.discardOnShutdown(jobStatus);
}
} finally {
checkpoints.clear();
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 4c3c1ff..469c1b1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -64,7 +64,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* checkpoints is consistent. Currently, after recovery we start out with only a single
* checkpoint to circumvent those situations.
*/
-public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCompletedCheckpointStore.class);
@@ -102,8 +102,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage,
Executor executor) throws Exception {
- super(executor);
-
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");
@@ -139,13 +137,14 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
* that the history of checkpoints is consistent.
*/
@Override
- public void recover() throws Exception {
+ public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
+ sharedStateRegistry.clear();
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -171,7 +170,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
// Re-register all shared states in the checkpoint.
- completedCheckpoint.registerSharedStates(sharedStateRegistry);
+ completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
@@ -195,9 +194,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
final String path = checkpointIdToPath(checkpoint.getCheckpointID());
- // First, register all shared states in the checkpoint to consolidates placeholder.
- checkpoint.registerSharedStates(sharedStateRegistry);
-
// Now add the new one. If it fails, we don't want to loose existing data.
checkpointsInZooKeeper.addAndLock(path, checkpoint);
@@ -206,7 +202,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
// Everything worked, let's remove a previous checkpoint if necessary.
while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
- removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
+ removeSubsumed(completedCheckpoints.removeFirst());
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
@@ -248,7 +244,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try {
- removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
+ removeShutdown(checkpoint, jobStatus);
} catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
@@ -274,8 +270,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
// ------------------------------------------------------------------------
private void removeSubsumed(
- final CompletedCheckpoint completedCheckpoint,
- final SharedStateRegistry sharedStateRegistry) throws Exception {
+ final CompletedCheckpoint completedCheckpoint) throws Exception {
if(completedCheckpoint == null) {
return;
@@ -287,7 +282,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
if (value != null) {
try {
- completedCheckpoint.discardOnSubsume(sharedStateRegistry);
+ completedCheckpoint.discardOnSubsume();
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
@@ -302,8 +297,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
private void removeShutdown(
final CompletedCheckpoint completedCheckpoint,
- final JobStatus jobStatus,
- final SharedStateRegistry sharedStateRegistry) throws Exception {
+ final JobStatus jobStatus) throws Exception {
if(completedCheckpoint == null) {
return;
@@ -313,7 +307,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
@Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
try {
- completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+ completedCheckpoint.discardOnShutdown(jobStatus);
} catch (Exception e) {
throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index a5e0f84..949839b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -29,10 +29,13 @@ import java.util.Objects;
import java.util.concurrent.Executor;
/**
+ * This registry manages state that is shared across (incremental) checkpoints, and is responsible
+ * for deleting shared state that is no longer used in any valid checkpoint.
+ *
* A {@code SharedStateRegistry} will be deployed in the
- * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
- * maintain the reference count of {@link StreamStateHandle}s which are shared
- * among different incremental checkpoints.
+ * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to
+ * maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
+ * them.
*/
public class SharedStateRegistry {
@@ -247,4 +250,11 @@ public class SharedStateRegistry {
}
}
}
+
+ /**
+ * Clears the registry.
+ */
+ public void clear() {
+ registeredStates.clear();
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 6e20be3..344b340 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.TestLogger;
import org.junit.Test;
@@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
@Override
- public void recover() throws Exception {
+ public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
throw new UnsupportedOperationException("Not implemented.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index fb5d7c3..1fe4e65 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -64,6 +64,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
*/
@Test
public void testAddAndGetLatestCheckpoint() throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
// Empty state
@@ -71,7 +72,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
assertEquals(0, checkpoints.getAllCheckpoints().size());
TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
- createCheckpoint(0), createCheckpoint(1) };
+ createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry) };
// Add and get latest
checkpoints.addCheckpoint(expected[0]);
@@ -89,11 +90,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
*/
@Test
public void testAddCheckpointMoreThanMaxRetained() throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
- createCheckpoint(0), createCheckpoint(1),
- createCheckpoint(2), createCheckpoint(3)
+ createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+ createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
};
// Add checkpoints
@@ -134,11 +136,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
*/
@Test
public void testGetAllCheckpoints() throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
- createCheckpoint(0), createCheckpoint(1),
- createCheckpoint(2), createCheckpoint(3)
+ createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+ createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
};
for (TestCompletedCheckpoint checkpoint : expected) {
@@ -159,11 +162,12 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
*/
@Test
public void testDiscardAllCheckpoints() throws Exception {
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
- createCheckpoint(0), createCheckpoint(1),
- createCheckpoint(2), createCheckpoint(3)
+ createCheckpoint(0, sharedStateRegistry), createCheckpoint(1, sharedStateRegistry),
+ createCheckpoint(2, sharedStateRegistry), createCheckpoint(3, sharedStateRegistry)
};
for (TestCompletedCheckpoint checkpoint : expected) {
@@ -187,7 +191,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
// ---------------------------------------------------------------------------------------------
- protected TestCompletedCheckpoint createCheckpoint(int id) throws IOException {
+ protected TestCompletedCheckpoint createCheckpoint(
+ int id,
+ SharedStateRegistry sharedStateRegistry) throws IOException {
+
int numberOfStates = 4;
CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
@@ -204,6 +211,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
operatorState.putState(i, subtaskState);
}
+ operatorState.registerSharedStates(sharedStateRegistry);
+
return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
}
@@ -251,8 +260,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
}
@Override
- public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
- if (super.discardOnSubsume(sharedStateRegistry)) {
+ public boolean discardOnSubsume() throws Exception {
+ if (super.discardOnSubsume()) {
discard();
return true;
} else {
@@ -261,8 +270,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
}
@Override
- public boolean discardOnShutdown(JobStatus jobStatus, SharedStateRegistry sharedStateRegistry) throws Exception {
- if (super.discardOnShutdown(jobStatus, sharedStateRegistry)) {
+ public boolean discardOnShutdown(JobStatus jobStatus) throws Exception {
+ if (super.discardOnShutdown(jobStatus)) {
discard();
return true;
} else {
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0bbb961..4846879 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -67,7 +67,7 @@ public class CompletedCheckpointTest {
new FileStateHandle(new Path(file.toURI()), file.length()),
file.getAbsolutePath());
- checkpoint.discardOnShutdown(JobStatus.FAILED, new SharedStateRegistry());
+ checkpoint.discardOnShutdown(JobStatus.FAILED);
assertEquals(false, file.exists());
}
@@ -93,11 +93,11 @@ public class CompletedCheckpointTest {
null);
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
- checkpoint.registerSharedStates(sharedStateRegistry);
+ checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
verify(state, times(1)).registerSharedStates(sharedStateRegistry);
// Subsume
- checkpoint.discardOnSubsume(sharedStateRegistry);
+ checkpoint.discardOnSubsume();
verify(state, times(1)).discardState();
}
@@ -132,9 +132,9 @@ public class CompletedCheckpointTest {
externalPath);
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
- checkpoint.registerSharedStates(sharedStateRegistry);
+ checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
- checkpoint.discardOnShutdown(status, sharedStateRegistry);
+ checkpoint.discardOnShutdown(status);
verify(state, times(0)).discardState();
assertEquals(true, file.exists());
@@ -148,7 +148,7 @@ public class CompletedCheckpointTest {
null,
null);
- checkpoint.discardOnShutdown(status, sharedStateRegistry);
+ checkpoint.discardOnShutdown(status);
verify(state, times(1)).discardState();
}
}
@@ -176,7 +176,7 @@ public class CompletedCheckpointTest {
CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
completed.setDiscardCallback(callback);
- completed.discardOnShutdown(JobStatus.FINISHED, new SharedStateRegistry());
+ completed.discardOnShutdown(JobStatus.FINISHED);
verify(callback, times(1)).notifyDiscardedCheckpoint();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index be94762..6f3c60b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -30,7 +30,6 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.powermock.api.mockito.PowerMockito.doReturn;
import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mock;
@@ -41,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointStoreTest {
@Override
- protected AbstractCompletedCheckpointStore createCompletedCheckpoints(
+ protected CompletedCheckpointStore createCompletedCheckpoints(
int maxNumberOfCheckpointsToRetain) throws Exception {
return new StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
@@ -52,13 +51,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
*/
@Test
public void testShutdownDiscardsCheckpoints() throws Exception {
- AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
- TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ CompletedCheckpointStore store = createCompletedCheckpoints(1);
+ TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
Collection<OperatorState> operatorStates = checkpoint.getOperatorStates().values();
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
- verifyCheckpointRegistered(operatorStates, store.sharedStateRegistry);
+ verifyCheckpointRegistered(operatorStates, sharedStateRegistry);
store.shutdown(JobStatus.FINISHED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -72,13 +72,14 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
*/
@Test
public void testSuspendDiscardsCheckpoints() throws Exception {
- AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
- TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ CompletedCheckpointStore store = createCompletedCheckpoints(1);
+ TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
Collection<OperatorState> taskStates = checkpoint.getOperatorStates().values();
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
- verifyCheckpointRegistered(taskStates, store.sharedStateRegistry);
+ verifyCheckpointRegistered(taskStates, sharedStateRegistry);
store.shutdown(JobStatus.SUSPENDED);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
@@ -92,7 +93,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
*/
@Test
public void testAddCheckpointWithFailedRemove() throws Exception {
-
+
final int numCheckpointsToRetain = 1;
CompletedCheckpointStore store = createCompletedCheckpoints(numCheckpointsToRetain);
@@ -100,7 +101,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
doReturn(i).when(checkpointToAdd).getCheckpointID();
doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
- doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(any(SharedStateRegistry.class));
+ doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume();
try {
store.addCheckpoint(checkpointToAdd);
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 44c802b..81ee4f9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -22,6 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.zookeeper.data.Stat;
@@ -34,7 +35,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -82,10 +82,14 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
*/
@Test
public void testRecover() throws Exception {
- AbstractCompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
- TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[] {
- createCheckpoint(0), createCheckpoint(1), createCheckpoint(2)
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
+
+ TestCompletedCheckpoint[] expected = new TestCompletedCheckpoint[]{
+ createCheckpoint(0, sharedStateRegistry),
+ createCheckpoint(1, sharedStateRegistry),
+ createCheckpoint(2, sharedStateRegistry)
};
// Add multiple checkpoints
@@ -93,16 +97,17 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
checkpoints.addCheckpoint(expected[1]);
checkpoints.addCheckpoint(expected[2]);
- verifyCheckpointRegistered(expected[0].getOperatorStates().values(), checkpoints.sharedStateRegistry);
- verifyCheckpointRegistered(expected[1].getOperatorStates().values(), checkpoints.sharedStateRegistry);
- verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry);
+ verifyCheckpointRegistered(expected[0].getOperatorStates().values(), sharedStateRegistry);
+ verifyCheckpointRegistered(expected[1].getOperatorStates().values(), sharedStateRegistry);
+ verifyCheckpointRegistered(expected[2].getOperatorStates().values(), sharedStateRegistry);
// All three should be in ZK
assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
// Recover
- checkpoints.recover();
+ sharedStateRegistry.clear();
+ checkpoints.recover(sharedStateRegistry);
assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -111,7 +116,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
List<CompletedCheckpoint> expectedCheckpoints = new ArrayList<>(3);
expectedCheckpoints.add(expected[1]);
expectedCheckpoints.add(expected[2]);
- expectedCheckpoints.add(createCheckpoint(3));
+ expectedCheckpoints.add(createCheckpoint(3, sharedStateRegistry));
checkpoints.addCheckpoint(expectedCheckpoints.get(2));
@@ -120,7 +125,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(expectedCheckpoints, actualCheckpoints);
for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
- verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), checkpoints.sharedStateRegistry);
+ verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), sharedStateRegistry);
}
}
@@ -131,8 +136,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
public void testShutdownDiscardsCheckpoints() throws Exception {
CuratorFramework client = ZOOKEEPER.getClient();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore store = createCompletedCheckpoints(1);
- TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+ TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -142,7 +148,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(0, store.getNumberOfRetainedCheckpoints());
assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
- store.recover();
+ sharedStateRegistry.clear();
+ store.recover(sharedStateRegistry);
assertEquals(0, store.getNumberOfRetainedCheckpoints());
}
@@ -156,8 +163,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
public void testSuspendKeepsCheckpoints() throws Exception {
CuratorFramework client = ZOOKEEPER.getClient();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore store = createCompletedCheckpoints(1);
- TestCompletedCheckpoint checkpoint = createCheckpoint(0);
+ TestCompletedCheckpoint checkpoint = createCheckpoint(0, sharedStateRegistry);
store.addCheckpoint(checkpoint);
assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -174,7 +182,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren());
// Recover again
- store.recover();
+ sharedStateRegistry.clear();
+ store.recover(sharedStateRegistry);
CompletedCheckpoint recovered = store.getLatestCheckpoint();
assertEquals(checkpoint, recovered);
@@ -188,18 +197,20 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
@Test
public void testLatestCheckpointRecovery() throws Exception {
final int numCheckpoints = 3;
- AbstractCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ CompletedCheckpointStore checkpointStore = createCompletedCheckpoints(numCheckpoints);
List<CompletedCheckpoint> checkpoints = new ArrayList<>(numCheckpoints);
- checkpoints.add(createCheckpoint(9));
- checkpoints.add(createCheckpoint(10));
- checkpoints.add(createCheckpoint(11));
+ checkpoints.add(createCheckpoint(9, sharedStateRegistry));
+ checkpoints.add(createCheckpoint(10, sharedStateRegistry));
+ checkpoints.add(createCheckpoint(11, sharedStateRegistry));
for (CompletedCheckpoint checkpoint : checkpoints) {
checkpointStore.addCheckpoint(checkpoint);
}
- checkpointStore.recover();
+ sharedStateRegistry.clear();
+ checkpointStore.recover(sharedStateRegistry);
CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint();
@@ -220,13 +231,16 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
ZooKeeperCompletedCheckpointStore zkCheckpointStore1 = createCompletedCheckpoints(numberOfCheckpoints);
ZooKeeperCompletedCheckpointStore zkCheckpointStore2 = createCompletedCheckpoints(numberOfCheckpoints);
- TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1);
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+
+ TestCompletedCheckpoint completedCheckpoint = createCheckpoint(1, sharedStateRegistry);
// complete the first checkpoint
zkCheckpointStore1.addCheckpoint(completedCheckpoint);
// recover the checkpoint by a different checkpoint store
- zkCheckpointStore2.recover();
+ sharedStateRegistry.clear();
+ zkCheckpointStore2.recover(sharedStateRegistry);
CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint();
assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
@@ -237,7 +251,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
// complete another checkpoint --> this should remove the first checkpoint from the store
// because the number of retained checkpoints == 1
- TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2);
+ TestCompletedCheckpoint completedCheckpoint2 = createCheckpoint(2, sharedStateRegistry);
zkCheckpointStore1.addCheckpoint(completedCheckpoint2);
List<CompletedCheckpoint> allCheckpoints = zkCheckpointStore1.getAllCheckpoints();
@@ -251,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
// check that we have not discarded the first completed checkpoint
assertFalse(recoveredTestCheckpoint.isDiscarded());
- TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3);
+ TestCompletedCheckpoint completedCheckpoint3 = createCheckpoint(3, sharedStateRegistry);
// this should release the last lock on completedCheckoint and thus discard it
zkCheckpointStore2.addCheckpoint(completedCheckpoint3);
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 7d22d8e..23cc8c8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -27,6 +27,7 @@ import org.apache.curator.utils.EnsurePath;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.TestLogger;
@@ -158,7 +159,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
stateStorage,
Executors.directExecutor());
- zooKeeperCompletedCheckpointStore.recover();
+ SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
+ zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index 2251e46..a0c4412 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -18,9 +18,10 @@
package org.apache.flink.runtime.testutils;
-import org.apache.flink.runtime.checkpoint.AbstractCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,7 +33,7 @@ import java.util.List;
* A checkpoint store, which supports shutdown and suspend. You can use this to test HA
* as long as the factory always returns the same store instance.
*/
-public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckpointStore {
+public class RecoverableCompletedCheckpointStore implements CompletedCheckpointStore {
private static final Logger LOG = LoggerFactory.getLogger(RecoverableCompletedCheckpointStore.class);
@@ -41,26 +42,24 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2);
@Override
- public void recover() throws Exception {
+ public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
checkpoints.addAll(suspended);
suspended.clear();
for (CompletedCheckpoint checkpoint : checkpoints) {
- checkpoint.registerSharedStates(sharedStateRegistry);
+ checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
}
}
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
- checkpoint.registerSharedStates(sharedStateRegistry);
-
checkpoints.addLast(checkpoint);
if (checkpoints.size() > 1) {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
- checkpointToSubsume.discardOnSubsume(sharedStateRegistry);
+ checkpointToSubsume.discardOnSubsume();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/04035639/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index f9af603..33c3454 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
@@ -147,6 +148,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
private static final int Parallelism = 8;
private static CountDownLatch CompletedCheckpointsLatch = new CountDownLatch(4);
+ private static CountDownLatch CompletedCheckpointsLatch2 = new CountDownLatch(6);
private static AtomicLongArray RecoveredStates = new AtomicLongArray(Parallelism);
@@ -171,8 +173,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
final String fileStateBackendPath = FileStateBackendBasePath.getAbsoluteFile().toString();
Configuration config = ZooKeeperTestUtils.createZooKeeperHAConfig(
- zooKeeperQuorum,
- fileStateBackendPath);
+ zooKeeperQuorum,
+ fileStateBackendPath);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 2);
@@ -188,7 +190,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
try {
// Test actor system
testActorSystem = AkkaUtils.createActorSystem(new Configuration(),
- new Some<>(new Tuple2<String, Object>("localhost", 0)));
+ new Some<>(new Tuple2<String, Object>("localhost", 0)));
// The job managers
jobManagerProcess[0] = new JobManagerProcess(0, config);
@@ -204,7 +206,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// The task manager
taskManagerSystem = AkkaUtils.createActorSystem(
- config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
+ config, Option.apply(new Tuple2<String, Object>("localhost", 0)));
TaskManager.startTaskManagerComponentsAndActor(
config,
ResourceID.generate(),
@@ -223,7 +225,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// Get the leader ref
ActorRef leaderRef = AkkaUtils.getActorRef(
- leaderAddress, testActorSystem, testDeadline.timeLeft());
+ leaderAddress, testActorSystem, testDeadline.timeLeft());
ActorGateway leader = new AkkaActorGateway(leaderRef, leaderId);
// Who's the boss?
@@ -248,10 +250,10 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
// Wait for the job to be running
JobManagerActorTestUtils.waitForJobStatus(
- jobGraph.getJobID(),
- JobStatus.RUNNING,
- leader,
- testDeadline.timeLeft());
+ jobGraph.getJobID(),
+ JobStatus.RUNNING,
+ leader,
+ testDeadline.timeLeft());
// Remove all files
FileUtils.deleteDirectory(FileStateBackendBasePath);
@@ -268,7 +270,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
if (output != null) {
if (output.contains("Failed to recover job") &&
- output.contains("java.io.FileNotFoundException")) {
+ output.contains("java.io.FileNotFoundException")) {
success = true;
break;
@@ -352,10 +354,12 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
try {
Configuration config = new Configuration();
+
config.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, retainedCheckpoints);
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, numJMs);
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTMs);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlots);
+ config.setString(ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, temporaryFolder.newFolder().toString());
String tmpFolderString = temporaryFolder.newFolder().toString();
@@ -372,6 +376,8 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(Parallelism);
env.enableCheckpointing(checkpointingInterval);
+ env.getCheckpointConfig()
+ .enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//TODO parameterize
env.setStateBackend(stateBackend);
@@ -427,7 +433,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
* A checkpointed source, which emits elements from 0 to a configured number.
*/
public static class CheckpointedSequenceSource extends RichParallelSourceFunction<Long>
- implements ListCheckpointed<Tuple2<Long, Integer>> {
+ implements ListCheckpointed<Tuple2<Long, Integer>> {
private static final Logger LOG = LoggerFactory.getLogger(CheckpointedSequenceSource.class);
@@ -465,8 +471,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
--repeat;
current = 0;
} else {
- ctx.collect(LastElement);
- return;
+ isRunning = false;
}
}
@@ -475,6 +480,11 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
Thread.sleep(50);
}
}
+
+ CompletedCheckpointsLatch2.await();
+ synchronized (ctx.getCheckpointLock()) {
+ ctx.collect(LastElement);
+ }
}
@Override
@@ -563,6 +573,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
public void notifyCheckpointComplete(long checkpointId) throws Exception {
LOG.debug("Checkpoint {} completed.", checkpointId);
CompletedCheckpointsLatch.countDown();
+ CompletedCheckpointsLatch2.countDown();
}
}