You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/05/19 09:01:49 UTC
[4/4] flink git commit: [FLINK-6633] Register shared state before
adding to CompletedCheckpointStore
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6200407
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6200407
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6200407
Branch: refs/heads/release-1.3
Commit: f6200407979fb6987f86d7029df81b345f0d9525
Parents: f58fec7
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri May 19 11:01:12 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 56 +--
.../state/RocksDBStateBackendTest.java | 88 ++++-
.../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-----
.../flink/runtime/checkpoint/OperatorState.java | 7 -
.../checkpoint/OperatorSubtaskState.java | 11 -
.../StandaloneCompletedCheckpointStore.java | 4 +-
.../flink/runtime/checkpoint/SubtaskState.java | 11 -
.../flink/runtime/checkpoint/TaskState.java | 7 -
.../ZooKeeperCompletedCheckpointStore.java | 149 +++-----
.../savepoint/SavepointV2Serializer.java | 17 +-
.../runtime/state/CompositeStateHandle.java | 15 +-
.../state/IncrementalKeyedStateHandle.java | 171 ++++-----
.../runtime/state/KeyGroupsStateHandle.java | 5 -
.../state/PlaceholderStreamStateHandle.java | 44 +--
.../runtime/state/SharedStateRegistry.java | 54 +--
.../state/memory/ByteStreamStateHandle.java | 7 +
.../checkpoint/CheckpointCoordinatorTest.java | 25 --
.../CompletedCheckpointStoreTest.java | 61 +--
.../checkpoint/CompletedCheckpointTest.java | 3 -
.../checkpoint/PendingCheckpointTest.java | 1 -
...ZooKeeperCompletedCheckpointStoreITCase.java | 7 +-
.../savepoint/CheckpointTestUtils.java | 25 +-
.../state/IncrementalKeyedStateHandleTest.java | 206 ++++++++++
.../runtime/state/SharedStateRegistryTest.java | 14 +-
.../runtime/state/StateBackendTestBase.java | 2 -
.../RecoverableCompletedCheckpointStore.java | 5 +-
...tractEventTimeWindowCheckpointingITCase.java | 9 +-
.../JobManagerHACheckpointRecoveryITCase.java | 375 +++++++++----------
28 files changed, 747 insertions(+), 776 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** True if incremental checkpointing is enabled */
private final boolean enableIncrementalCheckpointing;
- /** The sst files materialized in pending checkpoints */
- private final SortedMap<Long, Map<StateHandleID, StreamStateHandle>> materializedSstFiles = new TreeMap<>();
+ /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */
+ private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles = new TreeMap<>();
/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final long checkpointTimestamp;
/** All sst files that were part of the last previously completed checkpoint */
- private Map<StateHandleID, StreamStateHandle> baseSstFiles;
+ private Set<StateHandleID> baseSstFiles;
/** The state meta data */
private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private final CloseableRegistry closeableRegistry = new CloseableRegistry();
// new sst files since the last completed checkpoint
- private final Map<StateHandleID, StreamStateHandle> newSstFiles = new HashMap<>();
-
- // old sst files which have been materialized in previous completed checkpoints
- private final Map<StateHandleID, StreamStateHandle> oldSstFiles = new HashMap<>();
+ private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
// handles to the misc files in the current snapshot
private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
@@ -830,7 +828,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// use the last completed checkpoint as the comparison base.
baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
-
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
@@ -867,18 +864,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
final StateHandleID stateHandleID = new StateHandleID(fileName);
if (fileName.endsWith(SST_FILE_SUFFIX)) {
- StreamStateHandle fileHandle =
- baseSstFiles == null ? null : baseSstFiles.get(fileName);
+ final boolean existsAlready =
+ baseSstFiles == null ? false : baseSstFiles.contains(stateHandleID);
- if (fileHandle == null) {
- fileHandle = materializeStateData(filePath);
- newSstFiles.put(stateHandleID, fileHandle);
- } else {
+ if (existsAlready) {
// we introduce a placeholder state handle, that is replaced with the
// original from the shared state registry (created from a previous checkpoint)
- oldSstFiles.put(
+ sstFiles.put(
stateHandleID,
- new PlaceholderStreamStateHandle(fileHandle.getStateSize()));
+ new PlaceholderStreamStateHandle());
+ } else {
+ sstFiles.put(stateHandleID, materializeStateData(filePath));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
@@ -887,22 +883,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
- Map<StateHandleID, StreamStateHandle> sstFiles =
- new HashMap<>(newSstFiles.size() + oldSstFiles.size());
- sstFiles.putAll(newSstFiles);
- sstFiles.putAll(oldSstFiles);
synchronized (stateBackend.asyncSnapshotLock) {
- stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
+ stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
}
return new IncrementalKeyedStateHandle(
stateBackend.operatorIdentifier,
stateBackend.keyGroupRange,
checkpointId,
- newSstFiles,
- oldSstFiles,
+ sstFiles,
miscFiles,
metaStateHandle);
}
@@ -933,7 +924,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
statesToDiscard.add(metaStateHandle);
statesToDiscard.addAll(miscFiles.values());
- statesToDiscard.addAll(newSstFiles.values());
+ statesToDiscard.addAll(sstFiles.values());
try {
StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
@@ -1308,15 +1299,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
UUID.randomUUID().toString());
try {
- final Map<StateHandleID, StreamStateHandle> newSstFiles =
- restoreStateHandle.getCreatedSharedState();
- final Map<StateHandleID, StreamStateHandle> oldSstFiles =
- restoreStateHandle.getReferencedSharedState();
+ final Map<StateHandleID, StreamStateHandle> sstFiles =
+ restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();
- readAllStateData(newSstFiles, restoreInstancePath);
- readAllStateData(oldSstFiles, restoreInstancePath);
+ readAllStateData(sstFiles, restoreInstancePath);
readAllStateData(miscFiles, restoreInstancePath);
// read meta data
@@ -1409,8 +1397,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
throw new IOException("Could not create RocksDB data directory.");
}
- createFileHardLinksInRestorePath(newSstFiles, restoreInstancePath);
- createFileHardLinksInRestorePath(oldSstFiles, restoreInstancePath);
+ createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
@@ -1437,10 +1424,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
// use the restore sst files as the base for succeeding checkpoints
- Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
- sstFiles.putAll(newSstFiles);
- sstFiles.putAll(oldSstFiles);
- stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles);
+ stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 9340455..89eb1d5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,18 +26,22 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackendTestBase;
+import org.apache.flink.runtime.state.StateHandleID;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -58,7 +62,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.RunnableFuture;
import static junit.framework.TestCase.assertNotNull;
@@ -67,6 +75,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static org.powermock.api.mockito.PowerMockito.mock;
@@ -351,6 +360,83 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
assertEquals(1, allFilesInDbDir.size());
}
+ @Test
+ public void testSharedIncrementalStateDeRegistration() throws Exception {
+ if (enableIncrementalCheckpointing) {
+ AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+ ValueStateDescriptor<String> kvId =
+ new ValueStateDescriptor<>("id", String.class, null);
+
+ kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+
+ ValueState<String> state =
+ backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+
+
+ Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
+ SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
+ for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
+
+ reset(sharedStateRegistry);
+
+ backend.setCurrentKey(checkpointId);
+ state.update("Hello-" + checkpointId);
+
+ RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
+ checkpointId,
+ checkpointId,
+ createStreamFactory(),
+ CheckpointOptions.forFullCheckpoint());
+
+ snapshot.run();
+
+ IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
+ Map<StateHandleID, StreamStateHandle> sharedState =
+ new HashMap<>(stateHandle.getSharedState());
+
+ stateHandle.registerSharedStates(sharedStateRegistry);
+
+ for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
+ verify(sharedStateRegistry).registerReference(
+ stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+ e.getValue());
+ }
+
+ previousStateHandles.add(stateHandle);
+ backend.notifyCheckpointComplete(checkpointId);
+
+ //-----------------------------------------------------------------
+
+ if (previousStateHandles.size() > 1) {
+ checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+ }
+ }
+
+ while (!previousStateHandles.isEmpty()) {
+
+ reset(sharedStateRegistry);
+
+ checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+ }
+
+ backend.close();
+ backend.dispose();
+ }
+ }
+
+ private void checkRemove(IncrementalKeyedStateHandle remove, SharedStateRegistry registry) throws Exception {
+ for (StateHandleID id : remove.getSharedState().keySet()) {
+ verify(registry, times(0)).unregisterReference(
+ remove.createSharedStateRegistryKeyFromFileName(id));
+ }
+
+ remove.discardState();
+
+ for (StateHandleID id : remove.getSharedState().keySet()) {
+ verify(registry).unregisterReference(
+ remove.createSharedStateRegistryKeyFromFileName(id));
+ }
+ }
private void runStateUpdates() throws Exception{
for (int i = 50; i < 150; ++i) {
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 1ab5b41..b382080 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -25,8 +25,6 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,13 +175,13 @@ public class CompletedCheckpoint implements Serializable {
}
public void discardOnFailedStoring() throws Exception {
- new UnstoredDiscardStategy().discard();
+ doDiscard();
}
public boolean discardOnSubsume(SharedStateRegistry sharedStateRegistry) throws Exception {
if (props.discardOnSubsumed()) {
- new StoredDiscardStrategy(sharedStateRegistry).discard();
+ doDiscard();
return true;
}
@@ -197,7 +195,7 @@ public class CompletedCheckpoint implements Serializable {
jobStatus == JobStatus.FAILED && props.discardOnJobFailed() ||
jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) {
- new StoredDiscardStrategy(sharedStateRegistry).discard();
+ doDiscard();
return true;
} else {
if (externalPointer != null) {
@@ -209,6 +207,42 @@ public class CompletedCheckpoint implements Serializable {
}
}
+ private void doDiscard() throws Exception {
+
+ try {
+ // collect exceptions and continue cleanup
+ Exception exception = null;
+
+ // drop the metadata, if we have some
+ if (externalizedMetadata != null) {
+ try {
+ externalizedMetadata.discardState();
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+
+ // discard private state objects
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (exception != null) {
+ throw exception;
+ }
+ } finally {
+ operatorStates.clear();
+
+ // to be null-pointer safe, copy reference to stack
+ CompletedCheckpointStats.DiscardCallback discardCallback = this.discardCallback;
+ if (discardCallback != null) {
+ discardCallback.notifyDiscardedCheckpoint();
+ }
+ }
+ }
+
public long getStateSize() {
long result = 0L;
@@ -252,7 +286,7 @@ public class CompletedCheckpoint implements Serializable {
/**
* Register all shared states in the given registry. This is method is called
- * when the completed checkpoint has been successfully added into the store.
+ * before the checkpoint is added into the store.
*
* @param sharedStateRegistry The registry where shared states are registered
*/
@@ -266,102 +300,4 @@ public class CompletedCheckpoint implements Serializable {
public String toString() {
return String.format("Checkpoint %d @ %d for %s", checkpointID, timestamp, job);
}
-
- /**
- * Base class for the discarding strategies of {@link CompletedCheckpoint}.
- */
- private abstract class DiscardStrategy {
-
- protected Exception storedException;
-
- public DiscardStrategy() {
- this.storedException = null;
- }
-
- public void discard() throws Exception {
-
- try {
- // collect exceptions and continue cleanup
- storedException = null;
-
- doDiscardExternalizedMetaData();
- doDiscardSharedState();
- doDiscardPrivateState();
- doReportStoredExceptions();
- } finally {
- clearTaskStatesAndNotifyDiscardCompleted();
- }
- }
-
- protected void doDiscardExternalizedMetaData() {
- // drop the metadata, if we have some
- if (externalizedMetadata != null) {
- try {
- externalizedMetadata.discardState();
- } catch (Exception e) {
- storedException = e;
- }
- }
- }
-
- protected void doDiscardPrivateState() {
- // discard private state objects
- try {
- StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
- } catch (Exception e) {
- storedException = ExceptionUtils.firstOrSuppressed(e, storedException);
- }
- }
-
- protected abstract void doDiscardSharedState();
-
- protected void doReportStoredExceptions() throws Exception {
- if (storedException != null) {
- throw storedException;
- }
- }
-
- protected void clearTaskStatesAndNotifyDiscardCompleted() {
- operatorStates.clear();
- // to be null-pointer safe, copy reference to stack
- CompletedCheckpointStats.DiscardCallback discardCallback =
- CompletedCheckpoint.this.discardCallback;
-
- if (discardCallback != null) {
- discardCallback.notifyDiscardedCheckpoint();
- }
- }
- }
-
- /**
- * Discard all shared states created in the checkpoint. This strategy is applied
- * when the completed checkpoint fails to be added into the store.
- */
- private class UnstoredDiscardStategy extends CompletedCheckpoint.DiscardStrategy {
-
- @Override
- protected void doDiscardSharedState() {
- // nothing to do because we did not register any shared state yet. unregistered, new
- // shared state is then still considered private state and deleted as part of
- // doDiscardPrivateState().
- }
- }
-
- /**
- * Unregister all shared states from the given registry. This is strategy is
- * applied when the completed checkpoint is subsumed or the job terminates.
- */
- private class StoredDiscardStrategy extends CompletedCheckpoint.DiscardStrategy {
-
- SharedStateRegistry sharedStateRegistry;
-
- public StoredDiscardStrategy(SharedStateRegistry sharedStateRegistry) {
- this.sharedStateRegistry = Preconditions.checkNotNull(sharedStateRegistry);
- }
-
- @Override
- protected void doDiscardSharedState() {
- sharedStateRegistry.unregisterAll(operatorStates.values());
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index aa676e7..b153028 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -126,13 +126,6 @@ public class OperatorState implements CompositeStateHandle {
}
@Override
- public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
- for (OperatorSubtaskState operatorSubtaskState : operatorSubtaskStates.values()) {
- operatorSubtaskState.unregisterSharedStates(sharedStateRegistry);
- }
- }
-
- @Override
public long getStateSize() {
long result = 0L;
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 49ef863..e2ae632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -158,17 +158,6 @@ public class OperatorSubtaskState implements CompositeStateHandle {
}
@Override
- public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
- if (managedKeyedState != null) {
- managedKeyedState.unregisterSharedStates(sharedStateRegistry);
- }
-
- if (rawKeyedState != null) {
- rawKeyedState.unregisterSharedStates(sharedStateRegistry);
- }
- }
-
- @Override
public long getStateSize() {
return stateSize;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index f5e1db3..233cfc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -63,10 +63,10 @@ public class StandaloneCompletedCheckpointStore extends AbstractCompletedCheckpo
@Override
public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
- checkpoints.addLast(checkpoint);
-
checkpoint.registerSharedStates(sharedStateRegistry);
+ checkpoints.addLast(checkpoint);
+
if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst();
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
index a77baf3..20d675b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskState.java
@@ -162,17 +162,6 @@ public class SubtaskState implements CompositeStateHandle {
}
@Override
- public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
- if (managedKeyedState != null) {
- managedKeyedState.unregisterSharedStates(sharedStateRegistry);
- }
-
- if (rawKeyedState != null) {
- rawKeyedState.unregisterSharedStates(sharedStateRegistry);
- }
- }
-
- @Override
public long getStateSize() {
return stateSize;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index aa5c516..ed847a4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -141,13 +141,6 @@ public class TaskState implements CompositeStateHandle {
}
@Override
- public void unregisterSharedStates(SharedStateRegistry sharedStateRegistry) {
- for (SubtaskState subtaskState : subtaskStates.values()) {
- subtaskState.unregisterSharedStates(sharedStateRegistry);
- }
- }
-
- @Override
public long getStateSize() {
long result = 0L;
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 084d93e..4c3c1ff 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -35,7 +35,6 @@ import javax.annotation.Nullable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
-import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
@@ -79,7 +78,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
private final int maxNumberOfCheckpointsToRetain;
/** Local completed checkpoints. */
- private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+ private final ArrayDeque<CompletedCheckpoint> completedCheckpoints;
/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -122,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
this.checkpointsInZooKeeper = new ZooKeeperStateHandleStore<>(this.client, stateStorage, executor);
- this.checkpointStateHandles = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
+ this.completedCheckpoints = new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
LOG.info("Initialized in '{}'.", checkpointsPath);
}
@@ -146,7 +145,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
- checkpointStateHandles.clear();
+ completedCheckpoints.clear();
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,6 +169,11 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
+ if (completedCheckpoint != null) {
+ // Re-register all shared states in the checkpoint.
+ completedCheckpoint.registerSharedStates(sharedStateRegistry);
+ completedCheckpoints.add(completedCheckpoint);
+ }
} catch (Exception e) {
LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
"checkpoint store.", e);
@@ -177,11 +181,6 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
// remove the checkpoint with broken state handle
removeBrokenStateHandle(checkpointStateHandle.f1, checkpointStateHandle.f0);
}
-
- if (completedCheckpoint != null) {
- completedCheckpoint.registerSharedStates(sharedStateRegistry);
- checkpointStateHandles.add(checkpointStateHandle);
- }
}
}
@@ -195,20 +194,19 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
checkNotNull(checkpoint, "Checkpoint");
final String path = checkpointIdToPath(checkpoint.getCheckpointID());
- final RetrievableStateHandle<CompletedCheckpoint> stateHandle;
- // First add the new one. If it fails, we don't want to loose existing data.
- stateHandle = checkpointsInZooKeeper.addAndLock(path, checkpoint);
+ // First, register all shared states in the checkpoint to consolidates placeholder.
+ checkpoint.registerSharedStates(sharedStateRegistry);
- checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
+ // Now add the new one. If it fails, we don't want to loose existing data.
+ checkpointsInZooKeeper.addAndLock(path, checkpoint);
- // Register all shared states in the checkpoint
- checkpoint.registerSharedStates(sharedStateRegistry);
+ completedCheckpoints.addLast(checkpoint);
// Everything worked, let's remove a previous checkpoint if necessary.
- while (checkpointStateHandles.size() > maxNumberOfCheckpointsToRetain) {
+ while (completedCheckpoints.size() > maxNumberOfCheckpointsToRetain) {
try {
- removeSubsumed(checkpointStateHandles.removeFirst().f1, sharedStateRegistry);
+ removeSubsumed(completedCheckpoints.removeFirst(), sharedStateRegistry);
} catch (Exception e) {
LOG.warn("Failed to subsume the old checkpoint", e);
}
@@ -219,60 +217,23 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
@Override
public CompletedCheckpoint getLatestCheckpoint() {
- if (checkpointStateHandles.isEmpty()) {
+ if (completedCheckpoints.isEmpty()) {
return null;
}
else {
- while(!checkpointStateHandles.isEmpty()) {
- Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpointStateHandle = checkpointStateHandles.peekLast();
-
- try {
- return retrieveCompletedCheckpoint(checkpointStateHandle);
- } catch (Exception e) {
- LOG.warn("Could not retrieve latest checkpoint. Removing it from " +
- "the completed checkpoint store.", e);
-
- try {
- // remove the checkpoint with broken state handle
- Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint = checkpointStateHandles.pollLast();
- removeBrokenStateHandle(checkpoint.f1, checkpoint.f0);
- } catch (Exception removeException) {
- LOG.warn("Could not remove the latest checkpoint with a broken state handle.", removeException);
- }
- }
- }
-
- return null;
+ return completedCheckpoints.peekLast();
}
}
@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
- List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
-
- Iterator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> stateHandleIterator = checkpointStateHandles.iterator();
-
- while (stateHandleIterator.hasNext()) {
- Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandlePath = stateHandleIterator.next();
-
- try {
- checkpoints.add(retrieveCompletedCheckpoint(stateHandlePath));
- } catch (Exception e) {
- LOG.warn("Could not retrieve checkpoint. Removing it from the completed " +
- "checkpoint store.", e);
-
- // remove the checkpoint with broken state handle
- stateHandleIterator.remove();
- removeBrokenStateHandle(stateHandlePath.f1, stateHandlePath.f0);
- }
- }
-
+ List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
return checkpoints;
}
@Override
public int getNumberOfRetainedCheckpoints() {
- return checkpointStateHandles.size();
+ return completedCheckpoints.size();
}
@Override
@@ -285,15 +246,15 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
if (jobStatus.isGloballyTerminalState()) {
LOG.info("Shutting down");
- for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+ for (CompletedCheckpoint checkpoint : completedCheckpoints) {
try {
- removeShutdown(checkpoint.f1, jobStatus, sharedStateRegistry);
+ removeShutdown(checkpoint, jobStatus, sharedStateRegistry);
} catch (Exception e) {
LOG.error("Failed to discard checkpoint.", e);
}
}
- checkpointStateHandles.clear();
+ completedCheckpoints.clear();
String path = "/" + client.getNamespace();
@@ -303,7 +264,7 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
LOG.info("Suspending");
// Clear the local handles, but don't remove any state
- checkpointStateHandles.clear();
+ completedCheckpoints.clear();
// Release the state handle locks in ZooKeeper such that they can be deleted
checkpointsInZooKeeper.releaseAll();
@@ -313,21 +274,18 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
// ------------------------------------------------------------------------
private void removeSubsumed(
- final String pathInZooKeeper,
+ final CompletedCheckpoint completedCheckpoint,
final SharedStateRegistry sharedStateRegistry) throws Exception {
-
- ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
- @Override
- public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
- if (value != null) {
- final CompletedCheckpoint completedCheckpoint;
- try {
- completedCheckpoint = value.retrieveState();
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
- }
- if (completedCheckpoint != null) {
+ if(completedCheckpoint == null) {
+ return;
+ }
+
+ ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> action =
+ new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
+ @Override
+ public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
+ if (value != null) {
try {
completedCheckpoint.discardOnSubsume(sharedStateRegistry);
} catch (Exception e) {
@@ -335,46 +293,41 @@ public class ZooKeeperCompletedCheckpointStore extends AbstractCompletedCheckpoi
}
}
}
- }
- };
+ };
- checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, action);
+ checkpointsInZooKeeper.releaseAndTryRemove(
+ checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+ action);
}
private void removeShutdown(
- final String pathInZooKeeper,
+ final CompletedCheckpoint completedCheckpoint,
final JobStatus jobStatus,
final SharedStateRegistry sharedStateRegistry) throws Exception {
+ if(completedCheckpoint == null) {
+ return;
+ }
+
ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint> removeAction = new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
- if (value != null) {
- final CompletedCheckpoint completedCheckpoint;
-
- try {
- completedCheckpoint = value.retrieveState();
- } catch (Exception e) {
- throw new FlinkException("Could not retrieve the completed checkpoint from the given state handle.", e);
- }
-
- if (completedCheckpoint != null) {
- try {
- completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
- } catch (Exception e) {
- throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
- }
- }
+ try {
+ completedCheckpoint.discardOnShutdown(jobStatus, sharedStateRegistry);
+ } catch (Exception e) {
+ throw new FlinkException("Could not discard the completed checkpoint on subsume.", e);
}
}
};
- checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, removeAction);
+ checkpointsInZooKeeper.releaseAndTryRemove(
+ checkpointIdToPath(completedCheckpoint.getCheckpointID()),
+ removeAction);
}
private void removeBrokenStateHandle(
- final String pathInZooKeeper,
- final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
+ final String pathInZooKeeper,
+ final RetrievableStateHandle<CompletedCheckpoint> retrievableStateHandle) throws Exception {
checkpointsInZooKeeper.releaseAndTryRemove(pathInZooKeeper, new ZooKeeperStateHandleStore.RemoveCallback<CompletedCheckpoint>() {
@Override
public void apply(@Nullable RetrievableStateHandle<CompletedCheckpoint> value) throws FlinkException {
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index b71418b..da0022c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
@@ -75,7 +74,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
private static final byte KEY_GROUPS_HANDLE = 3;
private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
private static final byte INCREMENTAL_KEY_GROUPS_HANDLE = 5;
- private static final byte PLACEHOLDER_STREAM_STATE_HANDLE = 6;
/** The singleton instance of the serializer */
public static final SavepointV2Serializer INSTANCE = new SavepointV2Serializer();
@@ -328,8 +326,7 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
serializeStreamStateHandle(incrementalKeyedStateHandle.getMetaStateHandle(), dos);
- serializeStreamStateHandleMap(incrementalKeyedStateHandle.getCreatedSharedState(), dos);
- serializeStreamStateHandleMap(incrementalKeyedStateHandle.getReferencedSharedState(), dos);
+ serializeStreamStateHandleMap(incrementalKeyedStateHandle.getSharedState(), dos);
serializeStreamStateHandleMap(incrementalKeyedStateHandle.getPrivateState(), dos);
} else {
throw new IllegalStateException("Unknown KeyedStateHandle type: " + stateHandle.getClass());
@@ -390,16 +387,14 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
StreamStateHandle metaDataStateHandle = deserializeStreamStateHandle(dis);
- Map<StateHandleID, StreamStateHandle> createdStates = deserializeStreamStateHandleMap(dis);
- Map<StateHandleID, StreamStateHandle> referencedStates = deserializeStreamStateHandleMap(dis);
+ Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis);
Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis);
return new IncrementalKeyedStateHandle(
operatorId,
keyGroupRange,
checkpointId,
- createdStates,
- referencedStates,
+ sharedStates,
privateStates,
metaDataStateHandle);
} else {
@@ -485,10 +480,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
byte[] internalData = byteStreamStateHandle.getData();
dos.writeInt(internalData.length);
dos.write(byteStreamStateHandle.getData());
- } else if (stateHandle instanceof PlaceholderStreamStateHandle) {
- PlaceholderStreamStateHandle placeholder = (PlaceholderStreamStateHandle) stateHandle;
- dos.writeByte(PLACEHOLDER_STREAM_STATE_HANDLE);
- dos.writeLong(placeholder.getStateSize());
} else {
throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
}
@@ -510,8 +501,6 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> {
byte[] data = new byte[numBytes];
dis.readFully(data);
return new ByteStreamStateHandle(handleName, data);
- } else if (PLACEHOLDER_STREAM_STATE_HANDLE == type) {
- return new PlaceholderStreamStateHandle(dis.readLong());
} else {
throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
index 002b7c3..1bc6a0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CompositeStateHandle.java
@@ -34,7 +34,8 @@ package org.apache.flink.runtime.state;
* this handle and considered as private state until it is registered for the first time. Registration
* transfers ownership to the {@link SharedStateRegistry}.
* The composite state handle should only delete all private states in the
- * {@link StateObject#discardState()} method.
+ * {@link StateObject#discardState()} method, the {@link SharedStateRegistry} is responsible for
+ * deleting shared states after they were registered.
*/
public interface CompositeStateHandle extends StateObject {
@@ -45,18 +46,10 @@ public interface CompositeStateHandle extends StateObject {
* <p>
* After this is completed, newly created shared state is considered as published is no longer
* owned by this handle. This means that it should no longer be deleted as part of calls to
- * {@link #discardState()}.
+ * {@link #discardState()}. Instead, {@link #discardState()} will trigger an unregistration
+ * from the registry.
*
* @param stateRegistry The registry where shared states are registered.
*/
void registerSharedStates(SharedStateRegistry stateRegistry);
-
- /**
- * Unregister both created and referenced shared states in the given
- * {@link SharedStateRegistry}. This method is called when the checkpoint is
- * subsumed or the job is shut down.
- *
- * @param stateRegistry The registry where shared states are registered.
- */
- void unregisterSharedStates(SharedStateRegistry stateRegistry);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 706e219..770b5a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -28,18 +28,24 @@ import java.util.Map;
/**
* The handle to states of an incremental snapshot.
* <p>
- * The states contained in an incremental snapshot include
+ * The states contained in an incremental snapshot include:
* <ul>
- * <li> Created shared state which includes (the supposed to be) shared files produced since the last
+ * <li> Created shared state which includes shared files produced since the last
* completed checkpoint. These files can be referenced by succeeding checkpoints if the
* checkpoint succeeds to complete. </li>
* <li> Referenced shared state which includes the shared files materialized in previous
- * checkpoints. </li>
+ * checkpoints. Until we this is registered to a {@link SharedStateRegistry}, all referenced
+ * shared state handles are only placeholders, so that we do not send state handles twice
+ * from which we know that they already exist on the checkpoint coordinator.</li>
* <li> Private state which includes all other files, typically mutable, that cannot be shared by
* other checkpoints. </li>
* <li> Backend meta state which includes the information of existing states. </li>
* </ul>
*
+ * When this should become a completed checkpoint on the checkpoint coordinator, it must first be
+ * registered with a {@link SharedStateRegistry}, so that all placeholder state handles to
+ * previously existing state are replaced with the originals.
+ *
* IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
* should not be called from production code. This means this class is also not suited to serve as
* a key, e.g. in hash maps.
@@ -66,14 +72,9 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private final long checkpointId;
/**
- * State that the incremental checkpoint created new
- */
- private final Map<StateHandleID, StreamStateHandle> createdSharedState;
-
- /**
- * State that the incremental checkpoint references from previous checkpoints
+ * Shared state in the incremental checkpoint. This i
*/
- private final Map<StateHandleID, StreamStateHandle> referencedSharedState;
+ private final Map<StateHandleID, StreamStateHandle> sharedState;
/**
* Private state in the incremental checkpoint
@@ -86,32 +87,30 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private final StreamStateHandle metaStateHandle;
/**
- * True if the state handle has already registered shared states.
- * <p>
- * Once the shared states are registered, it's the {@link SharedStateRegistry}'s
- * responsibility to maintain the shared states. But in the cases where the
- * state handle is discarded before performing the registration, the handle
- * should delete all the shared states created by it.
+ * Once the shared states are registered, it is the {@link SharedStateRegistry}'s
+ * responsibility to cleanup those shared states.
+ * But in the cases where the state handle is discarded before performing the registration,
+ * the handle should delete all the shared states created by it.
+ *
+ * This variable is not null iff the handles was registered.
*/
- private boolean registered;
+ private transient SharedStateRegistry sharedStateRegistry;
public IncrementalKeyedStateHandle(
String operatorIdentifier,
KeyGroupRange keyGroupRange,
long checkpointId,
- Map<StateHandleID, StreamStateHandle> createdSharedState,
- Map<StateHandleID, StreamStateHandle> referencedSharedState,
+ Map<StateHandleID, StreamStateHandle> sharedState,
Map<StateHandleID, StreamStateHandle> privateState,
StreamStateHandle metaStateHandle) {
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.checkpointId = checkpointId;
- this.createdSharedState = Preconditions.checkNotNull(createdSharedState);
- this.referencedSharedState = Preconditions.checkNotNull(referencedSharedState);
+ this.sharedState = Preconditions.checkNotNull(sharedState);
this.privateState = Preconditions.checkNotNull(privateState);
this.metaStateHandle = Preconditions.checkNotNull(metaStateHandle);
- this.registered = false;
+ this.sharedStateRegistry = null;
}
@Override
@@ -123,12 +122,8 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
return checkpointId;
}
- public Map<StateHandleID, StreamStateHandle> getCreatedSharedState() {
- return createdSharedState;
- }
-
- public Map<StateHandleID, StreamStateHandle> getReferencedSharedState() {
- return referencedSharedState;
+ public Map<StateHandleID, StreamStateHandle> getSharedState() {
+ return sharedState;
}
public Map<StateHandleID, StreamStateHandle> getPrivateState() {
@@ -155,8 +150,6 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public void discardState() throws Exception {
- Preconditions.checkState(!registered, "Attempt to dispose a registered composite state with registered shared state. Must unregister first.");
-
try {
metaStateHandle.discardState();
} catch (Exception e) {
@@ -169,37 +162,35 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
LOG.warn("Could not properly discard misc file states.", e);
}
- try {
- StateUtil.bestEffortDiscardAllStateObjects(createdSharedState.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
+ // If this was not registered, we can delete the shared state. We can simply apply this
+ // to all handles, because all handles that have not been created for the first time for this
+ // are only placeholders at this point (disposing them is a NOP).
+ if (sharedStateRegistry == null) {
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
+ }
+ } else {
+ // If this was registered, we only unregister all our referenced shared states
+ // from the registry.
+ for (StateHandleID stateHandleID : sharedState.keySet()) {
+ sharedStateRegistry.unregisterReference(
+ createSharedStateRegistryKeyFromFileName(stateHandleID));
+ }
}
-
}
@Override
public long getStateSize() {
- long size = getPrivateStateSize();
-
- for (StreamStateHandle oldSstFileHandle : referencedSharedState.values()) {
- size += oldSstFileHandle.getStateSize();
- }
-
- return size;
- }
-
- /**
- * Returns the size of the state that is privately owned by this handle.
- */
- public long getPrivateStateSize() {
long size = StateUtil.getStateSize(metaStateHandle);
- for (StreamStateHandle newSstFileHandle : createdSharedState.values()) {
- size += newSstFileHandle.getStateSize();
+ for (StreamStateHandle sharedStateHandle : sharedState.values()) {
+ size += sharedStateHandle.getStateSize();
}
- for (StreamStateHandle miscFileHandle : privateState.values()) {
- size += miscFileHandle.getStateSize();
+ for (StreamStateHandle privateStateHandle : privateState.values()) {
+ size += privateStateHandle.getStateSize();
}
return size;
@@ -208,64 +199,38 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
- Preconditions.checkState(!registered, "The state handle has already registered its shared states.");
+ Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+
+ sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
- for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
+ for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
+ createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
SharedStateRegistry.Result result =
- stateRegistry.registerNewReference(registryKey, newSstFileEntry.getValue());
-
- // We update our reference with the result from the registry, to prevent the following
- // problem:
+ stateRegistry.registerReference(registryKey, sharedStateHandle.getValue());
+
+ // This step consolidates our shared handles with the registry, which does two things:
+ //
+ // 1) Replace placeholder state handle with already registered, actual state handles.
+ //
+ // 2) Deduplicate re-uploads of incremental state due to missing confirmations about
+ // completed checkpoints.
+ //
+ // This prevents the following problem:
// A previous checkpoint n has already registered the state. This can happen if a
// following checkpoint (n + x) wants to reference the same state before the backend got
// notified that checkpoint n completed. In this case, the shared registry did
// deduplication and returns the previous reference.
- newSstFileEntry.setValue(result.getReference());
- }
-
- for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileName : referencedSharedState.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(oldSstFileName.getKey());
-
- SharedStateRegistry.Result result = stateRegistry.obtainReference(registryKey);
-
- // Again we update our state handle with the result from the registry, thus replacing
- // placeholder state handles with the originals.
- oldSstFileName.setValue(result.getReference());
- }
-
- // Migrate state from unregistered to registered, so that it will not count as private state
- // for #discardState() from now.
- referencedSharedState.putAll(createdSharedState);
- createdSharedState.clear();
-
- registered = true;
- }
-
- @Override
- public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
-
- Preconditions.checkState(registered, "The state handle has not registered its shared states yet.");
-
- for (Map.Entry<StateHandleID, StreamStateHandle> newSstFileEntry : createdSharedState.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(newSstFileEntry.getKey());
- stateRegistry.releaseReference(registryKey);
- }
-
- for (Map.Entry<StateHandleID, StreamStateHandle> oldSstFileEntry : referencedSharedState.entrySet()) {
- SharedStateRegistryKey registryKey =
- createSharedStateRegistryKeyFromFileName(oldSstFileEntry.getKey());
- stateRegistry.releaseReference(registryKey);
+ sharedStateHandle.setValue(result.getReference());
}
-
- registered = false;
}
- private SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
+ /**
+ * Create a unique key to register one of our shared state handles.
+ */
+ @VisibleForTesting
+ public SharedStateRegistryKey createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
return new SharedStateRegistryKey(operatorIdentifier + '-' + keyGroupRange, shId);
}
@@ -293,10 +258,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
return false;
}
- if (!getCreatedSharedState().equals(that.getCreatedSharedState())) {
- return false;
- }
- if (!getReferencedSharedState().equals(that.getReferencedSharedState())) {
+ if (!getSharedState().equals(that.getSharedState())) {
return false;
}
if (!getPrivateState().equals(that.getPrivateState())) {
@@ -314,8 +276,7 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
int result = getOperatorIdentifier().hashCode();
result = 31 * result + getKeyGroupRange().hashCode();
result = 31 * result + (int) (getCheckpointId() ^ (getCheckpointId() >>> 32));
- result = 31 * result + getCreatedSharedState().hashCode();
- result = 31 * result + getReferencedSharedState().hashCode();
+ result = 31 * result + getSharedState().hashCode();
result = 31 * result + getPrivateState().hashCode();
result = 31 * result + getMetaStateHandle().hashCode();
return result;
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8280460..8e38ad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -98,11 +98,6 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
}
@Override
- public void unregisterSharedStates(SharedStateRegistry stateRegistry) {
- // No shared states
- }
-
- @Override
public void discardState() throws Exception {
stateHandle.discardState();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index 2136061..7c948a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -18,29 +18,20 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
/**
* A placeholder state handle for shared state that will replaced by an original that was
- * created in a previous checkpoint. So we don't have to send the handle twice, e.g. in
- * case of {@link ByteStreamStateHandle}. To be used in the referenced states of
+ * created in a previous checkpoint. So we don't have to send a state handle twice, e.g. in
+ * case of {@link ByteStreamStateHandle}. This class is used in the referenced states of
* {@link IncrementalKeyedStateHandle}.
- * <p>
- * IMPORTANT: This class currently overrides equals and hash code only for testing purposes. They
- * should not be called from production code. This means this class is also not suited to serve as
- * a key, e.g. in hash maps.
*/
public class PlaceholderStreamStateHandle implements StreamStateHandle {
private static final long serialVersionUID = 1L;
- /** We remember the size of the original file for which this is a placeholder */
- private final long originalSize;
-
- public PlaceholderStreamStateHandle(long originalSize) {
- this.originalSize = originalSize;
+ public PlaceholderStreamStateHandle() {
}
@Override
@@ -56,33 +47,6 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle {
@Override
public long getStateSize() {
- return originalSize;
- }
-
- /**
- * This method is should only be called in tests! This should never serve as key in a hash map.
- */
- @VisibleForTesting
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- PlaceholderStreamStateHandle that = (PlaceholderStreamStateHandle) o;
-
- return originalSize == that.originalSize;
- }
-
- /**
- * This method is should only be called in tests! This should never serve as key in a hash map.
- */
- @VisibleForTesting
- @Override
- public int hashCode() {
- return (int) (originalSize ^ (originalSize >>> 32));
+ return 0L;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index f9161b0..a5e0f84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -54,7 +54,7 @@ public class SharedStateRegistry {
}
/**
- * Register a reference to the given (supposedly new) shared state in the registry.
+ * Register a reference to the given shared state in the registry.
* This does the following: We check if the state handle is actually new by the
* registrationKey. If it is new, we register it with a reference count of 1. If there is
* already a state handle registered under the given key, we dispose the given "new" state
@@ -62,14 +62,14 @@ public class SharedStateRegistry {
* a replacement with the result.
*
* <p>IMPORTANT: caller should check the state handle returned by the result, because the
- * registry is performing deduplication and could potentially return a handle that is supposed
+ * registry is performing de-duplication and could potentially return a handle that is supposed
* to replace the one from the registration request.
*
* @param state the shared state for which we register a reference.
* @return the result of this registration request, consisting of the state handle that is
* registered under the key by the end of the oepration and its current reference count.
*/
- public Result registerNewReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
+ public Result registerReference(SharedStateRegistryKey registrationKey, StreamStateHandle state) {
Preconditions.checkNotNull(state);
@@ -96,28 +96,6 @@ public class SharedStateRegistry {
}
/**
- * Obtains one reference to the given shared state in the registry. This increases the
- * reference count by one.
- *
- * @param registrationKey the shared state for which we obtain a reference.
- * @return the shared state for which we release a reference.
- * @return the result of the request, consisting of the reference count after this operation
- * and the state handle.
- */
- public Result obtainReference(SharedStateRegistryKey registrationKey) {
-
- Preconditions.checkNotNull(registrationKey);
-
- synchronized (registeredStates) {
- SharedStateRegistry.SharedStateEntry entry =
- Preconditions.checkNotNull(registeredStates.get(registrationKey),
- "Could not find a state for the given registration key!");
- entry.increaseReferenceCount();
- return new Result(entry);
- }
- }
-
- /**
* Releases one reference to the given shared state in the registry. This decreases the
* reference count by one. Once the count reaches zero, the shared state is deleted.
*
@@ -125,7 +103,7 @@ public class SharedStateRegistry {
* @return the result of the request, consisting of the reference count after this operation
* and the state handle, or null if the state handle was deleted through this request.
*/
- public Result releaseReference(SharedStateRegistryKey registrationKey) {
+ public Result unregisterReference(SharedStateRegistryKey registrationKey) {
Preconditions.checkNotNull(registrationKey);
@@ -172,30 +150,18 @@ public class SharedStateRegistry {
}
}
- /**
- * Unregister all the shared states referenced by the given.
- *
- * @param stateHandles The shared states to unregister.
- */
- public void unregisterAll(Iterable<? extends CompositeStateHandle> stateHandles) {
- if (stateHandles == null) {
- return;
- }
-
- synchronized (registeredStates) {
- for (CompositeStateHandle stateHandle : stateHandles) {
- stateHandle.unregisterSharedStates(this);
- }
- }
- }
-
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
- if (streamStateHandle != null) {
+ // We do the small optimization to not issue discards for placeholders, which are NOPs.
+ if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
asyncDisposalExecutor.execute(
new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
}
}
+ private boolean isPlaceholder(StreamStateHandle stateHandle) {
+ return stateHandle instanceof PlaceholderStreamStateHandle;
+ }
+
/**
* An entry in the registry, tracking the handle and the corresponding reference count.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 42703f8..9ba9d35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -91,6 +91,13 @@ public class ByteStreamStateHandle implements StreamStateHandle {
return 31 * handleName.hashCode();
}
+ @Override
+ public String toString() {
+ return "ByteStreamStateHandle{" +
+ "handleName='" + handleName + '\'' +
+ '}';
+ }
+
/**
* An input stream view on a byte array.
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9250634..3b44d9a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -639,12 +639,6 @@ public class CheckpointCoordinatorTest {
assertEquals(checkpointIdNew, successNew.getCheckpointID());
assertTrue(successNew.getOperatorStates().isEmpty());
- // validate that the subtask states in old savepoint have unregister their shared states
- {
- verify(subtaskState1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- }
-
// validate that the relevant tasks got a confirmation message
{
verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -925,9 +919,6 @@ public class CheckpointCoordinatorTest {
verify(subtaskState1_2, times(1)).discardState();
// validate that all subtask states in the second checkpoint are not discarded
- verify(subtaskState2_1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2_2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2_3, never()).unregisterSharedStates(any(SharedStateRegistry.class));
verify(subtaskState2_1, never()).discardState();
verify(subtaskState2_2, never()).discardState();
verify(subtaskState2_3, never()).discardState();
@@ -951,9 +942,6 @@ public class CheckpointCoordinatorTest {
coord.shutdown(JobStatus.FINISHED);
// validate that the states in the second checkpoint have been discarded
- verify(subtaskState2_1, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2_2, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2_3, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
verify(subtaskState2_1, times(1)).discardState();
verify(subtaskState2_2, times(1)).discardState();
verify(subtaskState2_3, times(1)).discardState();
@@ -1562,10 +1550,6 @@ public class CheckpointCoordinatorTest {
verify(subtaskState1, never()).discardState();
verify(subtaskState2, never()).discardState();
- // Savepoints are not supposed to have any shared state.
- verify(subtaskState1, never()).unregisterSharedStates(any(SharedStateRegistry.class));
- verify(subtaskState2, never()).unregisterSharedStates(any(SharedStateRegistry.class));
-
// validate that the relevant tasks got a confirmation message
{
verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointIdNew), eq(timestampNew), any(CheckpointOptions.class));
@@ -2088,15 +2072,6 @@ public class CheckpointCoordinatorTest {
// shutdown the store
store.shutdown(JobStatus.SUSPENDED);
- // All shared states should be unregistered once the store is shut down
- for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
- for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
- for (OperatorSubtaskState subtaskState : taskState.getStates()) {
- verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- }
- }
- }
-
// restore the store
Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 985c662..fb5d7c3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
import org.junit.Test;
-import org.mockito.Mockito;
import java.io.IOException;
import java.util.Collection;
@@ -37,11 +37,6 @@ import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
/**
* Test for basic {@link CompletedCheckpointStore} contract.
@@ -114,12 +109,6 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
expected[i - 1].awaitDiscard();
assertTrue(expected[i - 1].isDiscarded());
assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
-
- for (OperatorState operatorState : taskStates) {
- for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
- verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
- }
- }
}
}
@@ -209,7 +198,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
operatorGroupState.put(operatorID, operatorState);
for (int i = 0; i < numberOfStates; i++) {
- OperatorSubtaskState subtaskState = mock(OperatorSubtaskState.class);
+ OperatorSubtaskState subtaskState =
+ new TestOperatorSubtaskState();
operatorState.putState(i, subtaskState);
}
@@ -217,18 +207,10 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
}
- protected void resetCheckpoint(Collection<OperatorState> operatorStates) {
- for (OperatorState operatorState : operatorStates) {
- for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
- Mockito.reset(subtaskState);
- }
- }
- }
-
protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates, SharedStateRegistry registry) {
for (OperatorState operatorState : operatorStates) {
for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
- verify(subtaskState, times(1)).registerSharedStates(eq(registry));
+ Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).registered);
}
}
}
@@ -236,7 +218,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
for (OperatorState operatorState : operatorStates) {
for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
- verify(subtaskState, times(1)).discardState();
+ Assert.assertTrue(((TestOperatorSubtaskState)subtaskState).discarded);
}
}
}
@@ -333,4 +315,37 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
}
}
+ static class TestOperatorSubtaskState extends OperatorSubtaskState {
+ private static final long serialVersionUID = 522580433699164230L;
+
+ boolean registered;
+ boolean discarded;
+
+ public TestOperatorSubtaskState() {
+ super(null, null, null, null, null);
+ this.registered = false;
+ this.discarded = false;
+ }
+
+ @Override
+ public void discardState() {
+ super.discardState();
+ Assert.assertFalse(discarded);
+ discarded = true;
+ registered = false;
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry sharedStateRegistry) {
+ super.registerSharedStates(sharedStateRegistry);
+ Assert.assertFalse(discarded);
+ registered = true;
+ }
+
+ public void reset() {
+ registered = false;
+ discarded = false;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 589ff46..0bbb961 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -100,7 +100,6 @@ public class CompletedCheckpointTest {
checkpoint.discardOnSubsume(sharedStateRegistry);
verify(state, times(1)).discardState();
- verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
}
/**
@@ -138,7 +137,6 @@ public class CompletedCheckpointTest {
checkpoint.discardOnShutdown(status, sharedStateRegistry);
verify(state, times(0)).discardState();
assertEquals(true, file.exists());
- verify(state, times(0)).unregisterSharedStates(sharedStateRegistry);
// Discard
props = new CheckpointProperties(false, false, true, true, true, true, true);
@@ -152,7 +150,6 @@ public class CompletedCheckpointTest {
checkpoint.discardOnShutdown(status, sharedStateRegistry);
verify(state, times(1)).discardState();
- verify(state, times(1)).unregisterSharedStates(sharedStateRegistry);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 6df01a0..a96b597 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -197,7 +197,6 @@ public class PendingCheckpointTest {
OperatorState state = mock(OperatorState.class);
doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
- doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class));
String targetDir = tmpFolder.newFolder().getAbsolutePath();
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 0d93289..44c802b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
@@ -100,11 +101,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
- resetCheckpoint(expected[0].getOperatorStates().values());
- resetCheckpoint(expected[1].getOperatorStates().values());
- resetCheckpoint(expected[2].getOperatorStates().values());
-
- // Recover TODO!!! clear registry!
+ // Recover
checkpoints.recover();
assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index b63782d..f985573 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -34,7 +34,6 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
@@ -273,18 +272,17 @@ public class CheckpointTestUtils {
private CheckpointTestUtils() {}
- private static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
+ public static IncrementalKeyedStateHandle createDummyIncrementalKeyedStateHandle(Random rnd) {
return new IncrementalKeyedStateHandle(
createRandomUUID(rnd).toString(),
new KeyGroupRange(1, 1),
42L,
- createRandomOwnedHandleMap(rnd),
- createRandomReferencedHandleMap(rnd),
- createRandomOwnedHandleMap(rnd),
+ createRandomStateHandleMap(rnd),
+ createRandomStateHandleMap(rnd),
createDummyStreamStateHandle(rnd));
}
- private static Map<StateHandleID, StreamStateHandle> createRandomOwnedHandleMap(Random rnd) {
+ public static Map<StateHandleID, StreamStateHandle> createRandomStateHandleMap(Random rnd) {
final int size = rnd.nextInt(4);
Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
for (int i = 0; i < size; ++i) {
@@ -296,24 +294,13 @@ public class CheckpointTestUtils {
return result;
}
- private static Map<StateHandleID, StreamStateHandle> createRandomReferencedHandleMap(Random rnd) {
- final int size = rnd.nextInt(4);
- Map<StateHandleID, StreamStateHandle> result = new HashMap<>(size);
- for (int i = 0; i < size; ++i) {
- StateHandleID randomId = new StateHandleID(createRandomUUID(rnd).toString());
- result.put(randomId, new PlaceholderStreamStateHandle(rnd.nextInt(1024)));
- }
-
- return result;
- }
-
- private static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
+ public static KeyGroupsStateHandle createDummyKeyGroupStateHandle(Random rnd) {
return new KeyGroupsStateHandle(
new KeyGroupRangeOffsets(1, 1, new long[]{rnd.nextInt(1024)}),
createDummyStreamStateHandle(rnd));
}
- private static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
+ public static StreamStateHandle createDummyStreamStateHandle(Random rnd) {
return new TestByteStreamStateHandleDeepCompare(
String.valueOf(createRandomUUID(rnd)),
String.valueOf(createRandomUUID(rnd)).getBytes(ConfigConstants.DEFAULT_CHARSET));