You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2017/07/28 13:42:54 UTC
[2/2] flink git commit: [FLINK-7268] [checkpoints] Scope
SharedStateRegistry objects per (re)start
[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff
Branch: refs/heads/release-1.3
Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c
Parents: 0225db2
Author: Stefan Richter <s....@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Fri Jul 28 15:42:28 2017 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 13 +-
.../checkpoint/CheckpointCoordinator.java | 32 +-
.../runtime/checkpoint/CompletedCheckpoint.java | 3 +-
.../checkpoint/CompletedCheckpointStore.java | 5 +-
.../StandaloneCompletedCheckpointStore.java | 4 +-
.../ZooKeeperCompletedCheckpointStore.java | 12 +-
.../runtime/executiongraph/ExecutionGraph.java | 6 +-
.../state/IncrementalKeyedStateHandle.java | 68 ++-
.../runtime/state/KeyGroupsStateHandle.java | 2 +-
.../runtime/state/MultiStreamStateHandle.java | 10 +-
.../runtime/state/SharedStateRegistry.java | 52 ++-
.../state/SharedStateRegistryFactory.java | 35 ++
.../state/memory/ByteStreamStateHandle.java | 1 +
...tCoordinatorExternalizedCheckpointsTest.java | 22 +-
.../CheckpointCoordinatorFailureTest.java | 7 +-
.../CheckpointCoordinatorMasterHooksTest.java | 7 +-
.../checkpoint/CheckpointCoordinatorTest.java | 437 ++++++++++---------
.../checkpoint/CheckpointStateRestoreTest.java | 10 +-
...ZooKeeperCompletedCheckpointStoreITCase.java | 25 +-
.../ZooKeeperCompletedCheckpointStoreTest.java | 7 +-
.../state/IncrementalKeyedStateHandleTest.java | 75 +++-
.../RecoverableCompletedCheckpointStore.java | 33 +-
.../streaming/runtime/tasks/StreamTask.java | 1 -
...tractEventTimeWindowCheckpointingITCase.java | 85 +++-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 51 +++
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
...ckendEventTimeWindowCheckpointingITCase.java | 4 +-
31 files changed, 688 insertions(+), 339 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a6b53ec..7e0910e 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
this.restoredKvStateMetaInfos = new HashMap<>();
this.materializedSstFiles = new TreeMap<>();
this.backendUID = UUID.randomUUID();
- LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
+
+ LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
+ this.operatorIdentifier,
+ this.backendUID);
}
/**
@@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
void takeSnapshot() throws Exception {
assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
+ final long lastCompletedCheckpoint;
+
// use the last completed checkpoint as the comparison base.
synchronized (stateBackend.materializedSstFiles) {
- baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+ lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
+ baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
}
+ LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
+ "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
// save meta data
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
: stateBackend.kvStateInformation.entrySet()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82933ac..fe94d25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.util.Preconditions;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
@Nullable
private CheckpointStatsTracker statsTracker;
+ /** A factory for SharedStateRegistry objects */
+ private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
/** Registry that tracks state which is shared across (incremental) checkpoints */
- private final SharedStateRegistry sharedStateRegistry;
+ private SharedStateRegistry sharedStateRegistry;
// --------------------------------------------------------------------------------------------
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
@Nullable String checkpointDirectory,
- Executor executor) {
+ Executor executor,
+ SharedStateRegistryFactory sharedStateRegistryFactory) {
// sanity checks
checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
this.checkpointDirectory = checkpointDirectory;
this.executor = checkNotNull(executor);
- this.sharedStateRegistry = new SharedStateRegistry(executor);
+ this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
+ this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
@@ -1044,10 +1050,23 @@ public class CheckpointCoordinator {
throw new IllegalStateException("CheckpointCoordinator is shut down");
}
- // Recover the checkpoints
- completedCheckpointStore.recover(sharedStateRegistry);
+ // We create a new shared state registry object, so that all pending async disposal requests from previous
+ // runs will go against the old object (were they can do no harm).
+ // This must happen under the checkpoint lock.
+ sharedStateRegistry.close();
+ sharedStateRegistry = sharedStateRegistryFactory.create(executor);
+
+ // Recover the checkpoints, TODO this could be done only when there is a new leader, not on each recovery
+ completedCheckpointStore.recover();
+
+ // Now, we re-register all (shared) states from the checkpoint store with the new registry
+ for (CompletedCheckpoint completedCheckpoint : completedCheckpointStore.getAllCheckpoints()) {
+ completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+ }
+
+ LOG.debug("Status of the shared state registry after restore: {}.", sharedStateRegistry);
- // restore from the latest checkpoint
+ // Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
if (latest == null) {
@@ -1121,7 +1140,6 @@ public class CheckpointCoordinator {
CompletedCheckpoint savepoint = SavepointLoader.loadAndValidateSavepoint(
job, tasks, savepointPath, userClassLoader, allowNonRestored);
- savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpointStore.addCheckpoint(savepoint);
// Reset the checkpoint ID counter
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 56aa19d..76d1580 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
private void doDiscard() throws Exception {
+ LOG.trace("Executing discard procedure for {}.", this);
+
try {
// collect exceptions and continue cleanup
Exception exception = null;
@@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable {
// discard private state objects
try {
Collection<OperatorState> values = operatorStates.values();
- LOG.trace("About to discard operator states {}.", values);
StateUtil.bestEffortDiscardAllStateObjects(values);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
import java.util.List;
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* available checkpoint.
- *
- * @param sharedStateRegistry the shared state registry to register recovered states.
*/
- void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+ void recover() throws Exception;
/**
* Adds a {@link CompletedCheckpoint} instance to the list of completed checkpoints.
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements CompletedCheckpointSt
}
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
// Nothing to do
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
package org.apache.flink.runtime.checkpoint;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* that the history of checkpoints is consistent.
*/
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
LOG.info("Recovering checkpoints from ZooKeeper.");
// Clear local handles in order to prevent duplicates on
// recovery. The local handles should reflect the state
// of ZooKeeper.
completedCheckpoints.clear();
- sharedStateRegistry.clear();
// Get all there is first
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
try {
completedCheckpoint = retrieveCompletedCheckpoint(checkpointStateHandle);
if (completedCheckpoint != null) {
- // Re-register all shared states in the checkpoint.
- completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
completedCheckpoints.add(completedCheckpoint);
}
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f9d2d69..c105d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -66,6 +66,7 @@ import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializedThrowable;
@@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
-
import org.apache.flink.util.StringUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -459,7 +460,8 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
checkpointIDCounter,
checkpointStore,
checkpointDir,
- ioExecutor);
+ ioExecutor,
+ SharedStateRegistry.DEFAULT_FACTORY);
// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
private final UUID backendIdentifier;
/**
- * The key-group range covered by this state handle
+ * The key-group range covered by this state handle.
*/
private final KeyGroupRange keyGroupRange;
/**
- * The checkpoint Id
+ * The checkpoint Id.
*/
private final long checkpointId;
/**
- * Shared state in the incremental checkpoint. This i
+ * Shared state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> sharedState;
/**
- * Private state in the incremental checkpoint
+ * Private state in the incremental checkpoint.
*/
private final Map<StateHandleID, StreamStateHandle> privateState;
/**
- * Primary meta data state of the incremental checkpoint
+ * Primary meta data state of the incremental checkpoint.
*/
private final StreamStateHandle metaStateHandle;
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
- if (this.keyGroupRange.getIntersection(keyGroupRange) != KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
- return this;
- } else {
- return null;
- }
+ return KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange)) ?
+ null : this;
}
@Override
public void discardState() throws Exception {
+ SharedStateRegistry registry = this.sharedStateRegistry;
+ final boolean isRegistered = (registry != null);
+
+ LOG.trace("Discarding IncrementalKeyedStateHandle (registered = {}) for checkpoint {} from backend with id {}.",
+ isRegistered,
+ checkpointId,
+ backendIdentifier);
+
try {
metaStateHandle.discardState();
} catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
// If this was not registered, we can delete the shared state. We can simply apply this
// to all handles, because all handles that have not been created for the first time for this
// are only placeholders at this point (disposing them is a NOP).
- if (sharedStateRegistry == null) {
- try {
- StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
- } catch (Exception e) {
- LOG.warn("Could not properly discard new sst file states.", e);
- }
- } else {
+ if (isRegistered) {
// If this was registered, we only unregister all our referenced shared states
// from the registry.
for (StateHandleID stateHandleID : sharedState.keySet()) {
- sharedStateRegistry.unregisterReference(
+ registry.unregisterReference(
createSharedStateRegistryKeyFromFileName(stateHandleID));
}
+ } else {
+ // Otherwise, we assume to own those handles and dispose them directly.
+ try {
+ StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+ } catch (Exception e) {
+ LOG.warn("Could not properly discard new sst file states.", e);
+ }
}
}
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
@Override
public void registerSharedStates(SharedStateRegistry stateRegistry) {
- Preconditions.checkState(sharedStateRegistry == null, "The state handle has already registered its shared states.");
+ // This is a quick check to avoid that we register twice with the same registry. However, the code allows to
+ // register again with a different registry. The implication is that ownership is transferred to this new
+ // registry. This should only happen in case of a restart, when the CheckpointCoordinator creates a new
+ // SharedStateRegistry for the current attempt and the old registry becomes meaningless. We also assume that
+ // an old registry object from a previous run is due to be GCed and will never be used for registration again.
+ Preconditions.checkState(
+ sharedStateRegistry != stateRegistry,
+ "The state handle has already registered its shared states to the given registry.");
sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
+ LOG.trace("Registering IncrementalKeyedStateHandle for checkpoint {} from backend with id {}.",
+ checkpointId,
+ backendIdentifier);
+
for (Map.Entry<StateHandleID, StreamStateHandle> sharedStateHandle : sharedState.entrySet()) {
SharedStateRegistryKey registryKey =
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements KeyedStateHandle {
result = 31 * result + getMetaStateHandle().hashCode();
return result;
}
+
+ @Override
+ public String toString() {
+ return "IncrementalKeyedStateHandle{" +
+ "backendIdentifier=" + backendIdentifier +
+ ", keyGroupRange=" + keyGroupRange +
+ ", checkpointId=" + checkpointId +
+ ", sharedState=" + sharedState +
+ ", privateState=" + privateState +
+ ", metaStateHandle=" + metaStateHandle +
+ ", registered=" + (sharedStateRegistry != null) +
+ '}';
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle
public String toString() {
return "KeyGroupsStateHandle{" +
"groupRangeOffsets=" + groupRangeOffsets +
- ", data=" + stateHandle +
+ ", stateHandle=" + stateHandle +
'}';
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements StreamStateHandle {
private final List<StreamStateHandle> stateHandles;
private final long stateSize;
- public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) throws IOException {
+ public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
this.stateHandles = Preconditions.checkNotNull(stateHandles);
long calculateSize = 0L;
for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements StreamStateHandle {
return stateSize;
}
+ @Override
+ public String toString() {
+ return "MultiStreamStateHandle{" +
+ "stateHandles=" + stateHandles +
+ ", stateSize=" + stateSize +
+ '}';
+ }
+
static final class MultiFSDataInputStream extends AbstractMultiFSDataInputStream {
private final TreeMap<Long, StreamStateHandle> stateHandleMap;
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
* maintain the reference count of {@link StreamStateHandle}s by a key that (logically) identifies
* them.
*/
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(SharedStateRegistry.class);
+ /** A singleton object for the default implementation of a {@link SharedStateRegistryFactory} */
+ public static final SharedStateRegistryFactory DEFAULT_FACTORY = new SharedStateRegistryFactory() {
+ @Override
+ public SharedStateRegistry create(Executor deleteExecutor) {
+ return new SharedStateRegistry(deleteExecutor);
+ }
+ };
+
/** All registered state objects by an artificial key */
private final Map<SharedStateRegistryKey, SharedStateRegistry.SharedStateEntry> registeredStates;
+ /** This flag indicates whether or not the registry is open or if close() was called */
+ private boolean open;
+
/** Executor for async state deletion */
private final Executor asyncDisposalExecutor;
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
public SharedStateRegistry(Executor asyncDisposalExecutor) {
this.registeredStates = new HashMap<>();
this.asyncDisposalExecutor = Preconditions.checkNotNull(asyncDisposalExecutor);
+ this.open = true;
}
/**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
+
+ Preconditions.checkState(open, "Attempt to register state to closed SharedStateRegistry.");
+
entry = registeredStates.get(registrationKey);
if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
// delete if this is a real duplicate
if (!Objects.equals(state, entry.stateHandle)) {
scheduledStateDeletion = state;
+ LOG.trace("Identified duplicate state registration under key {}. New state {} was determined to " +
+ "be an unnecessary copy of existing state {} and will be dropped.",
+ registrationKey,
+ state,
+ entry.stateHandle);
}
entry.increaseReferenceCount();
}
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
*
* @param registrationKey the shared state for which we release a reference.
* @return the result of the request, consisting of the reference count after this operation
- * and the state handle, or null if the state handle was deleted through this request.
+ * and the state handle, or null if the state handle was deleted through this request. Returns null if the registry
+ * was previously closed.
*/
public Result unregisterReference(SharedStateRegistryKey registrationKey) {
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
SharedStateRegistry.SharedStateEntry entry;
synchronized (registeredStates) {
+
entry = registeredStates.get(registrationKey);
Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
}
}
+ @Override
+ public String toString() {
+ synchronized (registeredStates) {
+ return "SharedStateRegistry{" +
+ "registeredStates=" + registeredStates +
+ '}';
+ }
+ }
+
private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
// We do the small optimization to not issue discards for placeholders, which are NOPs.
if (streamStateHandle != null && !isPlaceholder(streamStateHandle)) {
-
LOG.trace("Scheduled delete of state handle {}.", streamStateHandle);
asyncDisposalExecutor.execute(
new SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
return stateHandle instanceof PlaceholderStreamStateHandle;
}
+ @Override
+ public void close() {
+ synchronized (registeredStates) {
+ open = false;
+ }
+ }
+
/**
* An entry in the registry, tracking the handle and the corresponding reference count.
*/
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
}
}
}
-
- /**
- * Clears the registry.
- */
- public void clear() {
- synchronized (registeredStates) {
- registeredStates.clear();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+ /**
+ * Factory method for {@link SharedStateRegistry}.
+ *
+ * @param deleteExecutor executor used to run (async) deletes.
+ * @return a SharedStateRegistry object
+ */
+ SharedStateRegistry create(Executor deleteExecutor);
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements StreamStateHandle {
public String toString() {
return "ByteStreamStateHandle{" +
"handleName='" + handleName + '\'' +
+ ", dataBytes=" + data.length +
'}';
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
/**
* CheckpointCoordinator tests for externalized checkpoints.
*
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
checkpointDir.getAbsolutePath(),
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 344b340..5cca94f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new FailingCompletedCheckpointStore(),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.triggerCheckpoint(triggerTimestamp, false);
@@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
AcknowledgeCheckpoint acknowledgeMessage = new AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new CheckpointMetrics(), subtaskState);
-
+
try {
coord.receiveAcknowledgeMessage(acknowledgeMessage);
fail("Expected a checkpoint exception because the completed checkpoint store could not " +
@@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends TestLogger {
private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
@Override
- public void recover(SharedStateRegistry sharedStateRegistry) throws Exception {
+ public void recover() throws Exception {
throw new UnsupportedOperationException("Not implemented.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..94063a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.junit.Test;
-
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -47,14 +47,12 @@ import java.util.List;
import java.util.concurrent.Executor;
import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.any;
@@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
}
private static <T> T mockGeneric(Class<?> clazz) {
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..16a89ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.checkpoint;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,6 +54,9 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// nothing should be happening
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger a checkpoint, partially acknowledged
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(250);
}
while (!checkpoint.isDiscarded() &&
- coord.getNumberOfPendingCheckpoints() > 0 &&
- System.currentTimeMillis() < deadline);
+ coord.getNumberOfPendingCheckpoints() > 0 &&
+ System.currentTimeMillis() < deadline);
assertTrue("Checkpoint was not canceled by the timeout", checkpoint.isDiscarded());
assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
int numCallsSoFar = numCalls.get();
Thread.sleep(400);
assertTrue(numCallsSoFar == numCalls.get() ||
- numCallsSoFar+1 == numCalls.get());
+ numCallsSoFar+1 == numCalls.get());
// start another sequence of periodic scheduling
numCalls.set(0);
@@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
numCallsSoFar = numCalls.get();
Thread.sleep(400);
assertTrue(numCallsSoFar == numCalls.get() ||
- numCallsSoFar + 1 == numCalls.get());
+ numCallsSoFar + 1 == numCalls.get());
coord.shutdown(JobStatus.FINISHED);
}
@@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends TestLogger {
final long delay = 50;
final CheckpointCoordinator coord = new CheckpointCoordinator(
- jid,
- 2, // periodic interval is 2 ms
- 200_000, // timeout is very long (200 s)
- delay, // 50 ms delay between checkpoints
- 1,
- ExternalizedCheckpointSettings.none(),
- new ExecutionVertex[] { vertex },
- new ExecutionVertex[] { vertex },
- new ExecutionVertex[] { vertex },
- new StandaloneCheckpointIDCounter(),
- new StandaloneCompletedCheckpointStore(2),
- "dummy-path",
- Executors.directExecutor());
+ jid,
+ 2, // periodic interval is 2 ms
+ 200_000, // timeout is very long (200 s)
+ delay, // 50 ms delay between checkpoints
+ 1,
+ ExternalizedCheckpointSettings.none(),
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new ExecutionVertex[] { vertex },
+ new StandaloneCheckpointIDCounter(),
+ new StandaloneCompletedCheckpointStore(2),
+ "dummy-path",
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
try {
coord.startCheckpointScheduler();
@@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertEquals(0, coord.getNumberOfPendingCheckpoints());
assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
counter,
new StandaloneCompletedCheckpointStore(10),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while ((now = System.currentTimeMillis()) < minDuration ||
- (numCalls.get() < maxConcurrentAttempts && now < timeout));
+ (numCalls.get() < maxConcurrentAttempts && now < timeout));
assertEquals(maxConcurrentAttempts, numCalls.get());
verify(triggerVertex.getCurrentExecutionAttempt(), times(maxConcurrentAttempts))
- .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
+ .triggerCheckpoint(anyLong(), anyLong(), any(CheckpointOptions.class));
// now, once we acknowledge one checkpoint, it should trigger the next one
coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
@@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while ((now = System.currentTimeMillis()) < minDuration ||
- (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+ (coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
// validate that the pending checkpoints are there
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while (coord.getPendingCheckpoints().get(4L) == null &&
- System.currentTimeMillis() < newTimeout);
+ System.currentTimeMillis() < newTimeout);
// do the final check
assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
@@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
- new Answer<ExecutionState>() {
- @Override
- public ExecutionState answer(InvocationOnMock invocation){
- return currentState.get();
- }
- });
+ new Answer<ExecutionState>() {
+ @Override
+ public ExecutionState answer(InvocationOnMock invocation){
+ return currentState.get();
+ }
+ });
CheckpointCoordinator coord = new CheckpointCoordinator(
jid,
@@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.startCheckpointScheduler();
@@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
Thread.sleep(20);
}
while (System.currentTimeMillis() < timeout &&
- coord.getNumberOfPendingCheckpoints() == 0);
+ coord.getNumberOfPendingCheckpoints() == 0);
assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
}
@@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
List<Future<CompletedCheckpoint>> savepointFutures = new ArrayList<>();
@@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(2),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
String savepointDir = tmpFolder.newFolder().getAbsolutePath();
@@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore();
@@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- subtaskState);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ subtaskState);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState subtaskState = mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index));
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- subtaskState);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ subtaskState);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
ChainedStateHandle<StreamStateHandle> valueSizeTuple = generateStateForVertex(jobVertexID1, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
- jobVertexID1, keyGroupPartitions1.get(index), false);
+ jobVertexID1, keyGroupPartitions1.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
ChainedStateHandle<StreamStateHandle> state = generateStateForVertex(jobVertexID2, index);
KeyGroupsStateHandle keyGroupState = generateKeyGroupState(
- jobVertexID2, keyGroupPartitions2.get(index), false);
+ jobVertexID2, keyGroupPartitions2.get(index), false);
SubtaskState checkpointStateHandles = new SubtaskState(state, null, null, keyGroupState, null);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends TestLogger {
int newParallelism2 = scaleOut ? 13 : 2;
final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
- jobVertexID1,
- parallelism1,
- maxParallelism1);
+ jobVertexID1,
+ parallelism1,
+ maxParallelism1);
final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
- jobVertexID2,
- parallelism2,
- maxParallelism2);
+ jobVertexID2,
+ parallelism2,
+ maxParallelism2);
List<ExecutionVertex> allExecutionVertices = new ArrayList<>(parallelism1 + parallelism2);
@@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
ExecutionVertex[] arrayExecutionVertices =
- allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
+ allExecutionVertices.toArray(new ExecutionVertex[allExecutionVertices.size()]);
// set up the coordinator and validate the initial state
CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger the checkpoint
coord.triggerCheckpoint(timestamp, false);
@@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger {
CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L);
List<KeyGroupRange> keyGroupPartitions1 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
List<KeyGroupRange> keyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
//vertex 1
for (int index = 0; index < jobVertex1.getParallelism(); index++) {
@@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends TestLogger {
SubtaskState checkpointStateHandles = new SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
expectedOpStatesBackend.add(opStateBackend);
expectedOpStatesRaw.add(opStateRaw);
SubtaskState checkpointStateHandles =
- new SubtaskState(new ChainedStateHandle<>(
- Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
+ new SubtaskState(new ChainedStateHandle<>(
+ Collections.<StreamStateHandle>singletonList(null)), opStateBackend, opStateRaw, keyedStateBackend, keyedStateRaw);
AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(
- jid,
- jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
- checkpointId,
- new CheckpointMetrics(),
- checkpointStateHandles);
+ jid,
+ jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+ checkpointId,
+ new CheckpointMetrics(),
+ checkpointStateHandles);
coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
}
@@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends TestLogger {
Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
List<KeyGroupRange> newKeyGroupPartitions2 =
- StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
+ StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2);
final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
- jobVertexID1,
- parallelism1,
- maxParallelism1);
+ jobVertexID1,
+ parallelism1,
+ maxParallelism1);
// rescale vertex 2
final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
- jobVertexID2,
- newParallelism2,
- maxParallelism2);
+ jobVertexID2,
+ newParallelism2,
+ maxParallelism2);
tasks.put(jobVertexID1, newJobVertex1);
tasks.put(jobVertexID2, newJobVertex2);
@@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
return new Tuple2<>(jobVertexID, operatorID);
}
-
+
/**
* old topology
* [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2
@@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1);
operatorStates.put(id.f1, taskState);
for (int index = 0; index < taskState.getParallelism(); index++) {
- StreamStateHandle subNonPartitionedState =
+ StreamStateHandle subNonPartitionedState =
generateStateForVertex(id.f0, index)
.get(0);
OperatorStateHandle subManagedOperatorState =
@@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
spy(new StandaloneCompletedCheckpointStore(1));
CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint(
- jobID,
- 2,
- System.currentTimeMillis(),
- System.currentTimeMillis() + 3000,
- operatorStates,
- Collections.<MasterState>emptyList(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- null);
+ jobID,
+ 2,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 3000,
+ operatorStates,
+ Collections.<MasterState>emptyList(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ null);
when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
@@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
standaloneCompletedCheckpointStore,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
coord.restoreLatestCheckpointedState(tasks, false, true);
@@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
"fake-directory",
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
List<Collection<OperatorStateHandle>> repartitionedStates =
- repartitioner.repartitionState(Collections.singletonList(osh), 3);
+ repartitioner.repartitionState(Collections.singletonList(osh), 3);
Map<String, Integer> checkCounts = new HashMap<>(3);
for (Collection<OperatorStateHandle> operatorStateHandles : repartitionedStates) {
for (OperatorStateHandle operatorStateHandle : operatorStateHandles) {
for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo :
- operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
+ operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
String stateName = stateNameToMetaInfo.getKey();
Integer count = checkCounts.get(stateName);
@@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
// ------------------------------------------------------------------------
public static KeyGroupsStateHandle generateKeyGroupState(
- JobVertexID jobVertexID,
- KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
+ JobVertexID jobVertexID,
+ KeyGroupRange keyGroupPartition, boolean rawState) throws IOException {
List<Integer> testStatesLists = new ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
@@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static KeyGroupsStateHandle generateKeyGroupState(
- KeyGroupRange keyGroupRange,
- List<? extends Serializable> states) throws IOException {
+ KeyGroupRange keyGroupRange,
+ List<? extends Serializable> states) throws IOException {
Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == states.size());
Tuple2<byte[], List<long[]>> serializedDataWithOffsets =
- serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
+ serializeTogetherAndTrackOffsets(Collections.<List<? extends Serializable>>singletonList(states));
KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
ByteStreamStateHandle allSerializedStatesHandle = new TestByteStreamStateHandleDeepCompare(
- String.valueOf(UUID.randomUUID()),
- serializedDataWithOffsets.f0);
+ String.valueOf(UUID.randomUUID()),
+ serializedDataWithOffsets.f0);
KeyGroupsStateHandle keyGroupsStateHandle = new KeyGroupsStateHandle(
- keyGroupRangeOffsets,
- allSerializedStatesHandle);
+ keyGroupRangeOffsets,
+ allSerializedStatesHandle);
return keyGroupsStateHandle;
}
public static Tuple2<byte[], List<long[]>> serializeTogetherAndTrackOffsets(
- List<List<? extends Serializable>> serializables) throws IOException {
+ List<List<? extends Serializable>> serializables) throws IOException {
List<long[]> offsets = new ArrayList<>(serializables.size());
List<byte[]> serializedGroupValues = new ArrayList<>();
@@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends TestLogger {
runningGroupsOffset = 0;
for (byte[] serializedGroupValue : serializedGroupValues) {
System.arraycopy(
- serializedGroupValue,
- 0,
- allSerializedValuesConcatenated,
- runningGroupsOffset,
- serializedGroupValue.length);
+ serializedGroupValue,
+ 0,
+ allSerializedValuesConcatenated,
+ runningGroupsOffset,
+ serializedGroupValue.length);
runningGroupsOffset += serializedGroupValue.length;
}
return new Tuple2<>(allSerializedValuesConcatenated, offsets);
}
public static ChainedStateHandle<StreamStateHandle> generateStateForVertex(
- JobVertexID jobVertexID,
- int index) throws IOException {
+ JobVertexID jobVertexID,
+ int index) throws IOException {
Random random = new Random(jobVertexID.hashCode() + index);
int value = random.nextInt();
@@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static ChainedStateHandle<StreamStateHandle> generateChainedStateHandle(
- Serializable value) throws IOException {
+ Serializable value) throws IOException {
return ChainedStateHandle.wrapSingleHandle(
- TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
+ TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()), value));
}
public static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
- JobVertexID jobVertexID,
- int index,
- int namedStates,
- int partitionsPerState,
- boolean rawState) throws IOException {
+ JobVertexID jobVertexID,
+ int index,
+ int namedStates,
+ int partitionsPerState,
+ boolean rawState) throws IOException {
Map<String, List<? extends Serializable>> statesListsMap = new HashMap<>(namedStates);
@@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
private static ChainedStateHandle<OperatorStateHandle> generateChainedPartitionableStateHandle(
- Map<String, List<? extends Serializable>> states) throws IOException {
+ Map<String, List<? extends Serializable>> states) throws IOException {
List<List<? extends Serializable>> namedStateSerializables = new ArrayList<>(states.size());
@@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends TestLogger {
int idx = 0;
for (Map.Entry<String, List<? extends Serializable>> entry : states.entrySet()) {
offsetsMap.put(
- entry.getKey(),
- new OperatorStateHandle.StateMetaInfo(
- serializationWithOffsets.f1.get(idx),
- OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+ entry.getKey(),
+ new OperatorStateHandle.StateMetaInfo(
+ serializationWithOffsets.f1.get(idx),
+ OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
++idx;
}
ByteStreamStateHandle streamStateHandle = new TestByteStreamStateHandleDeepCompare(
- String.valueOf(UUID.randomUUID()),
- serializationWithOffsets.f0);
+ String.valueOf(UUID.randomUUID()),
+ serializationWithOffsets.f0);
OperatorStateHandle operatorStateHandle =
- new OperatorStateHandle(offsetsMap, streamStateHandle);
+ new OperatorStateHandle(offsetsMap, streamStateHandle);
return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
}
static ExecutionJobVertex mockExecutionJobVertex(
- JobVertexID jobVertexID,
- int parallelism,
- int maxParallelism) {
+ JobVertexID jobVertexID,
+ int parallelism,
+ int maxParallelism) {
return mockExecutionJobVertex(
jobVertexID,
@@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-
+
when(vertex.getJobVertex()).thenReturn(jobVertex);
return vertex;
@@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static void verifyStateRestore(
- JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
- List<KeyGroupRange> keyGroupPartitions) throws Exception {
+ JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
+ List<KeyGroupRange> keyGroupPartitions) throws Exception {
for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
@@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends TestLogger {
ChainedStateHandle<StreamStateHandle> expectNonPartitionedState = generateStateForVertex(jobVertexID, i);
ChainedStateHandle<StreamStateHandle> actualNonPartitionedState = taskStateHandles.getLegacyOperatorState();
assertTrue(CommonTestUtils.isSteamContentEqual(
- expectNonPartitionedState.get(0).openInputStream(),
- actualNonPartitionedState.get(0).openInputStream()));
+ expectNonPartitionedState.get(0).openInputStream(),
+ actualNonPartitionedState.get(0).openInputStream()));
ChainedStateHandle<OperatorStateHandle> expectedOpStateBackend =
- generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
+ generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
List<Collection<OperatorStateHandle>> actualPartitionableState = taskStateHandles.getManagedOperatorState();
assertTrue(CommonTestUtils.isSteamContentEqual(
- expectedOpStateBackend.get(0).openInputStream(),
- actualPartitionableState.get(0).iterator().next().openInputStream()));
+ expectedOpStateBackend.get(0).openInputStream(),
+ actualPartitionableState.get(0).iterator().next().openInputStream()));
KeyGroupsStateHandle expectPartitionedKeyGroupState = generateKeyGroupState(
- jobVertexID, keyGroupPartitions.get(i), false);
+ jobVertexID, keyGroupPartitions.get(i), false);
Collection<KeyedStateHandle> actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState();
compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), actualPartitionedKeyGroupState);
}
}
public static void compareKeyedState(
- Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
- Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
+ Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
+ Collection<? extends KeyedStateHandle> actualPartitionedKeyGroupState) throws Exception {
KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = expectPartitionedKeyGroupState.iterator().next();
int expectedTotalKeyGroups = expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
@@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
inputStream.seek(offset);
int expectedKeyGroupState =
- InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
+ InstantiationUtil.deserializeObject(inputStream, Thread.currentThread().getContextClassLoader());
for (KeyedStateHandle oneActualKeyedStateHandle : actualPartitionedKeyGroupState) {
assertTrue(oneActualKeyedStateHandle instanceof KeyGroupsStateHandle);
@@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) {
actualInputStream.seek(actualOffset);
int actualGroupState = InstantiationUtil.
- deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
+ deserializeObject(actualInputStream, Thread.currentThread().getContextClassLoader());
assertEquals(expectedKeyGroupState, actualGroupState);
}
}
@@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
public static void comparePartitionableState(
- List<ChainedStateHandle<OperatorStateHandle>> expected,
- List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
+ List<ChainedStateHandle<OperatorStateHandle>> expected,
+ List<List<Collection<OperatorStateHandle>>> actual) throws Exception {
List<String> expectedResult = new ArrayList<>();
for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle : expected) {
@@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
for (long offset : entry.getValue().getOffsets()) {
in.seek(offset);
Integer state = InstantiationUtil.
- deserializeObject(in, Thread.currentThread().getContextClassLoader());
+ deserializeObject(in, Thread.currentThread().getContextClassLoader());
resultCollector.add(opIdx + " : " + entry.getKey() + " : " + state);
}
}
@@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// Periodic
CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- true);
+ System.currentTimeMillis(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ true);
assertTrue(triggerResult.isFailure());
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
// Not periodic
triggerResult = coord.triggerCheckpoint(
- System.currentTimeMillis(),
- CheckpointProperties.forStandardCheckpoint(),
- null,
- false);
+ System.currentTimeMillis(),
+ CheckpointProperties.forStandardCheckpoint(),
+ null,
+ false);
assertFalse(triggerResult.isFailure());
}
@@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends TestLogger {
int maxPartitionsPerState = 1 + r.nextInt(9);
doTestPartitionableStateRepartitioning(
- r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
+ r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
}
}
private void doTestPartitionableStateRepartitioning(
- Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
+ Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
List<OperatorStateHandle> previousParallelOpInstanceStates = new ArrayList<>(oldParallelism);
@@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends TestLogger {
}
OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ?
- OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+ OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
namedStatesToOffsets.put(
- "State-" + s,
- new OperatorStateHandle.StateMetaInfo(offs, mode));
+ "State-" + s,
+ new OperatorStateHandle.StateMetaInfo(offs, mode));
}
previousParallelOpInstanceStates.add(
- new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+ new OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
}
Map<StreamStateHandle, Map<String, List<Long>>> expected = new HashMap<>();
@@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
long[] offs = e.getValue().getOffsets();
int replication = e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ?
- newParallelism : 1;
+ newParallelism : 1;
expectedTotalPartitions += replication * offs.length;
List<Long> offsList = new ArrayList<>(offs.length);
@@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger {
OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
List<Collection<OperatorStateHandle>> pshs =
- repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
+ repartitioner.repartitionState(previousParallelOpInstanceStates, newParallelism);
Map<StreamStateHandle, Map<String, List<Long>>> actual = new HashMap<>();
@@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
coord.setCheckpointStatsTracker(tracker);
@@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
new StandaloneCheckpointIDCounter(),
store,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
store.addCheckpoint(new CompletedCheckpoint(
new JobID(),
@@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
checkpointIDCounter,
completedCheckpointStore,
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// trigger a first checkpoint
assertTrue(
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7d24568..0888cff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.util.SerializableObject;
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
// create ourselves a checkpoint with state
final long timestamp = 34623786L;
@@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
try {
coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest {
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(1),
null,
- Executors.directExecutor());
+ Executors.directExecutor(),
+ SharedStateRegistry.DEFAULT_FACTORY);
StreamStateHandle serializedState = CheckpointCoordinatorTest
.generateChainedStateHandle(new SerializableObject())