You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/08/31 17:28:37 UTC
[19/27] flink git commit: [FLINK-4381] Refactor State to Prepare For
Key-Group State Backends
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index ac4503d..9025090 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -18,46 +18,56 @@
package org.apache.flink.runtime.checkpoint;
+import com.google.common.collect.Iterables;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
-import java.io.Serializable;
+import java.io.IOException;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
/**
- * Simple container class which contains the task state and key-value state handles for the sub
+ * Simple container class which contains the task state and key-group state handles for the sub
* tasks of a {@link org.apache.flink.runtime.jobgraph.JobVertex}.
*
- * This class basically groups all tasks and key groups belonging to the same job vertex together.
+ * This class basically groups all non-partitioned state and key-group state belonging to the same job vertex together.
*/
-public class TaskState implements Serializable {
+public class TaskState implements StateObject {
private static final long serialVersionUID = -4845578005863201810L;
private final JobVertexID jobVertexID;
- /** Map of task states which can be accessed by their sub task index */
+ /** handles to non-partitioned states, subtaskindex -> subtaskstate */
private final Map<Integer, SubtaskState> subtaskStates;
- /** Map of key-value states which can be accessed by their key group index */
- private final Map<Integer, KeyGroupState> kvStates;
+ /** handles to partitioned states, subtaskindex -> keyed state */
+ private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles;
- /** Parallelism of the operator when it was checkpointed */
+ /** parallelism of the operator when it was checkpointed */
private final int parallelism;
- public TaskState(JobVertexID jobVertexID, int parallelism) {
- this.jobVertexID = jobVertexID;
+ /** maximum parallelism of the operator when the job was first created */
+ private final int maxParallelism;
- this.subtaskStates = new HashMap<>(parallelism);
+ public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
+ Preconditions.checkArgument(
+ parallelism <= maxParallelism,
+ "Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
- this.kvStates = new HashMap<>();
+ this.jobVertexID = jobVertexID;
+ //preallocate lists of the required size, so that we can randomly set values to indexes
+ this.subtaskStates = new HashMap<>(parallelism);
+ this.keyGroupsStateHandles = new HashMap<>(parallelism);
this.parallelism = parallelism;
+ this.maxParallelism = maxParallelism;
}
public JobVertexID getJobVertexID() {
@@ -65,6 +75,8 @@ public class TaskState implements Serializable {
}
public void putState(int subtaskIndex, SubtaskState subtaskState) {
+ Preconditions.checkNotNull(subtaskState);
+
if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
" exceeds the maximum number of sub tasks " + subtaskStates.size());
@@ -73,31 +85,38 @@ public class TaskState implements Serializable {
}
}
- public SubtaskState getState(int subtaskIndex) {
+ public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle keyGroupsStateHandle) {
+ Preconditions.checkNotNull(keyGroupsStateHandle);
+
if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
- " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
} else {
- return subtaskStates.get(subtaskIndex);
+ keyGroupsStateHandles.put(subtaskIndex, keyGroupsStateHandle);
}
}
- public Collection<SubtaskState> getStates() {
- return subtaskStates.values();
- }
-
- public long getStateSize() {
- long result = 0L;
- for (SubtaskState subtaskState : subtaskStates.values()) {
- result += subtaskState.getStateSize();
+ public SubtaskState getState(int subtaskIndex) {
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ } else {
+ return subtaskStates.get(subtaskIndex);
}
+ }
- for (KeyGroupState keyGroupState : kvStates.values()) {
- result += keyGroupState.getStateSize();
+ public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + keyGroupsStateHandles.size());
+ } else {
+ return keyGroupsStateHandles.get(subtaskIndex);
}
+ }
- return result;
+ public Collection<SubtaskState> getStates() {
+ return subtaskStates.values();
}
public int getNumberCollectedStates() {
@@ -108,48 +127,44 @@ public class TaskState implements Serializable {
return parallelism;
}
- public void putKvState(int keyGroupId, KeyGroupState keyGroupState) {
- kvStates.put(keyGroupId, keyGroupState);
+ public int getMaxParallelism() {
+ return maxParallelism;
}
- public KeyGroupState getKvState(int keyGroupId) {
- return kvStates.get(keyGroupId);
+ public Collection<KeyGroupsStateHandle> getKeyGroupStates() {
+ return keyGroupsStateHandles.values();
}
- /**
- * Retrieve the set of key-value state key groups specified by the given key group partition set.
- * The key groups are returned as a map where the key group index maps to the serialized state
- * handle of the key group.
- *
- * @param keyGroupPartition Set of key group indices
- * @return Map of serialized key group state handles indexed by their key group index.
- */
- public Map<Integer, SerializedValue<StateHandle<?>>> getUnwrappedKvStates(Set<Integer> keyGroupPartition) {
- HashMap<Integer, SerializedValue<StateHandle<?>>> result = new HashMap<>(keyGroupPartition.size());
-
- for (Integer keyGroupId : keyGroupPartition) {
- KeyGroupState keyGroupState = kvStates.get(keyGroupId);
-
- if (keyGroupState != null) {
- result.put(keyGroupId, kvStates.get(keyGroupId).getKeyGroupState());
+ public boolean hasNonPartitionedState() {
+ for(SubtaskState sts : subtaskStates.values()) {
+ if (sts != null && !sts.getChainedStateHandle().isEmpty()) {
+ return true;
}
}
-
- return result;
+ return false;
}
- public int getNumberCollectedKvStates() {
- return kvStates.size();
+ @Override
+ public void discardState() throws Exception {
+ StateUtil.bestEffortDiscardAllStateObjects(
+ Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
}
- public void discard(ClassLoader classLoader) throws Exception {
- for (SubtaskState subtaskState : subtaskStates.values()) {
- subtaskState.discard(classLoader);
- }
- for (KeyGroupState keyGroupState : kvStates.values()) {
- keyGroupState.discard(classLoader);
+ @Override
+ public long getStateSize() throws Exception {
+ long result = 0L;
+
+ for (int i = 0; i < parallelism; i++) {
+ if (subtaskStates.get(i) != null) {
+ result += subtaskStates.get(i).getStateSize();
+ }
+ if (keyGroupsStateHandles.get(i) != null) {
+ result += keyGroupsStateHandles.get(i).getStateSize();
+ }
}
+
+ return result;
}
@Override
@@ -158,7 +173,7 @@ public class TaskState implements Serializable {
TaskState other = (TaskState) obj;
return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
- subtaskStates.equals(other.subtaskStates) && kvStates.equals(other.kvStates);
+ subtaskStates.equals(other.subtaskStates) && keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
} else {
return false;
}
@@ -166,6 +181,20 @@ public class TaskState implements Serializable {
@Override
public int hashCode() {
- return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, kvStates);
+ return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, keyGroupsStateHandles);
+ }
+
+ @Override
+ public void close() throws IOException {
+ StateUtil.bestEffortCloseAllStateObjects(
+ Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+ }
+
+ public Map<Integer, SubtaskState> getSubtaskStates() {
+ return Collections.unmodifiableMap(subtaskStates);
+ }
+
+ public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() {
+ return Collections.unmodifiableMap(keyGroupsStateHandles);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index 376ef70..b826d9f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,9 +24,9 @@ import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
private final ClassLoader userClassLoader;
/** Local completed checkpoints. */
- private final ArrayDeque<Tuple2<StateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
+ private final ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> checkpointStateHandles;
/**
* Creates a {@link ZooKeeperCompletedCheckpointStore} instance.
@@ -101,7 +101,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
ClassLoader userClassLoader,
CuratorFramework client,
String checkpointsPath,
- StateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
+ RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage) throws Exception {
checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at least one checkpoint.");
checkNotNull(stateStorage, "State storage");
@@ -143,7 +143,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
checkpointStateHandles.clear();
// Get all there is first
- List<Tuple2<StateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
+ List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> initialCheckpoints;
while (true) {
try {
initialCheckpoints = checkpointsInZooKeeper.getAllSortedByName();
@@ -161,10 +161,10 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
if (numberOfInitialCheckpoints > 0) {
// Take the last one. This is the latest checkpoints, because path names are strictly
// increasing (checkpoint ID).
- Tuple2<StateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
+ Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> latest = initialCheckpoints
.get(numberOfInitialCheckpoints - 1);
- CompletedCheckpoint latestCheckpoint = latest.f0.getState(userClassLoader);
+ CompletedCheckpoint latestCheckpoint = latest.f0.retrieveState();
checkpointStateHandles.add(latest);
@@ -193,7 +193,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
// First add the new one. If it fails, we don't want to loose existing data.
String path = String.format("/%s", checkpoint.getCheckpointID());
- final StateHandle<CompletedCheckpoint> stateHandle = checkpointsInZooKeeper.add(path, checkpoint);
+ final RetrievableStateHandle<CompletedCheckpoint> stateHandle =
+ checkpointsInZooKeeper.add(path, checkpoint);
checkpointStateHandles.addLast(new Tuple2<>(stateHandle, path));
@@ -211,7 +212,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
return null;
}
else {
- return checkpointStateHandles.getLast().f0.getState(userClassLoader);
+ return checkpointStateHandles.getLast().f0.retrieveState();
}
}
@@ -219,8 +220,8 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
List<CompletedCheckpoint> checkpoints = new ArrayList<>(checkpointStateHandles.size());
- for (Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
- checkpoints.add(stateHandle.f0.getState(userClassLoader));
+ for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandle : checkpointStateHandles) {
+ checkpoints.add(stateHandle.f0.retrieveState());
}
return checkpoints;
@@ -235,7 +236,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
public void shutdown() throws Exception {
LOG.info("Shutting down");
- for (Tuple2<StateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
+ for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : checkpointStateHandles) {
try {
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
}
@@ -264,7 +265,7 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
* Removes the state handle from ZooKeeper, discards the checkpoints, and the state handle.
*/
private void removeFromZooKeeperAndDiscardCheckpoint(
- final Tuple2<StateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
+ final Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) throws Exception {
final BackgroundCallback callback = new BackgroundCallback() {
@Override
@@ -273,16 +274,15 @@ public class ZooKeeperCompletedCheckpointStore implements CompletedCheckpointSto
if (event.getType() == CuratorEventType.DELETE) {
if (event.getResultCode() == 0) {
// The checkpoint
- CompletedCheckpoint checkpoint = stateHandleAndPath
- .f0.getState(userClassLoader);
-
- checkpoint.discard(userClassLoader);
-
- // Discard the state handle
- stateHandleAndPath.f0.discardState();
-
- // Discard the checkpoint
- LOG.debug("Discarded " + checkpoint);
+ try {
+ CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
+ checkpoint.discardState();
+ // Discard the checkpoint
+ LOG.debug("Discarded " + checkpoint);
+ } finally {
+ // Discard the state handle
+ stateHandleAndPath.f0.discardState();
+ }
}
else {
throw new IllegalStateException("Unexpected result code " +
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
index 6efc01e..49f51be 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
@@ -142,13 +142,13 @@ public class FsSavepointStore implements SavepointStore {
}
@Override
- public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+ public void disposeSavepoint(String path) throws Exception {
Preconditions.checkNotNull(path, "Path");
- Preconditions.checkNotNull(classLoader, "Class loader");
try {
Savepoint savepoint = loadSavepoint(path);
- savepoint.dispose(classLoader);
+ LOG.info("Disposing savepoint: " + path);
+ savepoint.dispose();
Path filePath = new Path(path);
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
index cf30f5f..2cf8f31 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
@@ -108,9 +108,8 @@ public class HeapSavepointStore implements SavepointStore {
}
@Override
- public void disposeSavepoint(String path, ClassLoader classLoader) throws Exception {
+ public void disposeSavepoint(String path) throws Exception {
Preconditions.checkNotNull(path, "Path");
- Preconditions.checkNotNull(classLoader, "Class loader");
Savepoint savepoint;
synchronized (shutDownLock) {
@@ -118,7 +117,7 @@ public class HeapSavepointStore implements SavepointStore {
}
if (savepoint != null) {
- savepoint.dispose(classLoader);
+ savepoint.dispose();
} else {
throw new IllegalArgumentException("Invalid path '" + path + "'.");
}
@@ -131,7 +130,7 @@ public class HeapSavepointStore implements SavepointStore {
// available at this point.
for (Savepoint savepoint : savepoints.values()) {
try {
- savepoint.dispose(ClassLoader.getSystemClassLoader());
+ savepoint.dispose();
} catch (Throwable t) {
LOG.warn("Failed to dispose savepoint " + savepoint.getCheckpointId(), t);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
index 7823ab9..643f14c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/Savepoint.java
@@ -68,13 +68,7 @@ public interface Savepoint {
/**
* Disposes the savepoint.
- *
- * <p>The class loader is needed, because savepoints can currently point to
- * arbitrary snapshot {@link org.apache.flink.runtime.state.StateHandle}
- * instances, which need the user code class loader for deserialization.
- *
- * @param classLoader Class loader for disposal
*/
- void dispose(ClassLoader classLoader) throws Exception;
+ void dispose() throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 1be7a58..47917b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -60,12 +60,12 @@ public class SavepointLoader {
ExecutionJobVertex executionJobVertex = tasks.get(taskState.getJobVertexID());
if (executionJobVertex != null) {
- if (executionJobVertex.getParallelism() == taskState.getParallelism()) {
+ if (executionJobVertex.getMaxParallelism() == taskState.getMaxParallelism()) {
taskStates.put(taskState.getJobVertexID(), taskState);
}
else {
String msg = String.format("Failed to rollback to savepoint %s. " +
- "Parallelism mismatch between savepoint state and new program. " +
+ "Max parallelism mismatch between savepoint state and new program. " +
"Cannot map operator %s with parallelism %d to new program with " +
"parallelism %d. This indicates that the program has been changed " +
"in a non-compatible way after the savepoint.",
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
index d06f3d0..20b3d89 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializers.java
@@ -28,10 +28,13 @@ import java.util.Map;
*/
public class SavepointSerializers {
+
+ private static final int SAVEPOINT_VERSION_0 = 0;
private static final Map<Integer, SavepointSerializer<?>> SERIALIZERS = new HashMap<>(1);
static {
- SERIALIZERS.put(SavepointV0.VERSION, SavepointV0Serializer.INSTANCE);
+ SERIALIZERS.put(SAVEPOINT_VERSION_0, null);
+ SERIALIZERS.put(SavepointV1.VERSION, SavepointV1Serializer.INSTANCE);
}
/**
@@ -66,7 +69,7 @@ public class SavepointSerializers {
if (serializer != null) {
return serializer;
} else {
- throw new IllegalArgumentException("Unknown savepoint version " + version + ".");
+ throw new IllegalArgumentException("Cannot restore savepoint version " + version + ".");
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 71fcb34..68b88d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -48,15 +48,10 @@ public interface SavepointStore {
/**
* Disposes the savepoint at the specified path.
*
- * <p>The class loader is needed, because savepoints can currently point to
- * arbitrary snapshot {@link org.apache.flink.runtime.state.StateHandle}
- * instances, which need the user code class loader for deserialization.
- *
* @param path Path of savepoint to dispose
- * @param classLoader Class loader for disposal
* @throws Exception Failures during diposal are forwarded
*/
- void disposeSavepoint(String path, ClassLoader classLoader) throws Exception;
+ void disposeSavepoint(String path) throws Exception;
/**
* Shut downs the savepoint store.
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
deleted file mode 100644
index d60d80e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.util.Preconditions;
-
-import java.util.ArrayList;
-import java.util.Collection;
-
-/**
- * Savepoint version 0.
- *
- * <p>This format was introduced with Flink 1.1.0.
- */
-public class SavepointV0 implements Savepoint {
-
- /** The savepoint version. */
- public static final int VERSION = 0;
-
- /** The checkpoint ID */
- private final long checkpointId;
-
- /** The task states */
- private final Collection<TaskState> taskStates = new ArrayList();
-
- public SavepointV0(long checkpointId, Collection<TaskState> taskStates) {
- this.checkpointId = checkpointId;
- this.taskStates.addAll(taskStates);
- }
-
- @Override
- public int getVersion() {
- return VERSION;
- }
-
- @Override
- public long getCheckpointId() {
- return checkpointId;
- }
-
- @Override
- public Collection<TaskState> getTaskStates() {
- return taskStates;
- }
-
- @Override
- public void dispose(ClassLoader classLoader) throws Exception {
- Preconditions.checkNotNull(classLoader, "Class loader");
- for (TaskState taskState : taskStates) {
- taskState.discard(classLoader);
- }
- taskStates.clear();
- }
-
- @Override
- public String toString() {
- return "Savepoint(version=" + VERSION + ")";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
-
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- SavepointV0 that = (SavepointV0) o;
- return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
- }
-
- @Override
- public int hashCode() {
- int result = (int) (checkpointId ^ (checkpointId >>> 32));
- result = 31 * result + taskStates.hashCode();
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
deleted file mode 100644
index e82b85f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Serializer.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.KeyGroupState;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * Serializer for {@link SavepointV0} instances.
- *
- * <p>In contrast to previous savepoint versions, this serializer makes sure
- * that no default Java serialization is used for serialization. Therefore, we
- * don't rely on any involved Java classes to stay the same.
- */
-class SavepointV0Serializer implements SavepointSerializer<SavepointV0> {
-
- public static final SavepointV0Serializer INSTANCE = new SavepointV0Serializer();
-
- private SavepointV0Serializer() {
- }
-
- @Override
- public void serialize(SavepointV0 savepoint, DataOutputStream dos) throws IOException {
- dos.writeLong(savepoint.getCheckpointId());
-
- Collection<TaskState> taskStates = savepoint.getTaskStates();
- dos.writeInt(taskStates.size());
-
- for (TaskState taskState : savepoint.getTaskStates()) {
- // Vertex ID
- dos.writeLong(taskState.getJobVertexID().getLowerPart());
- dos.writeLong(taskState.getJobVertexID().getUpperPart());
-
- // Parallelism
- int parallelism = taskState.getParallelism();
- dos.writeInt(parallelism);
-
- // Sub task states
- dos.writeInt(taskState.getNumberCollectedStates());
-
- for (int i = 0; i < parallelism; i++) {
- SubtaskState subtaskState = taskState.getState(i);
-
- if (subtaskState != null) {
- dos.writeInt(i);
-
- SerializedValue<?> serializedValue = subtaskState.getState();
- if (serializedValue == null) {
- dos.writeInt(-1); // null
- } else {
- byte[] serialized = serializedValue.getByteArray();
- dos.writeInt(serialized.length);
- dos.write(serialized, 0, serialized.length);
- }
-
- dos.writeLong(subtaskState.getStateSize());
- dos.writeLong(subtaskState.getDuration());
- }
- }
-
- // Key group states
- dos.writeInt(taskState.getNumberCollectedKvStates());
-
- for (int i = 0; i < parallelism; i++) {
- KeyGroupState keyGroupState = taskState.getKvState(i);
-
- if (keyGroupState != null) {
- dos.write(i);
-
- SerializedValue<?> serializedValue = keyGroupState.getKeyGroupState();
- if (serializedValue == null) {
- dos.writeInt(-1); // null
- } else {
- byte[] serialized = serializedValue.getByteArray();
- dos.writeInt(serialized.length);
- dos.write(serialized, 0, serialized.length);
- }
-
- dos.writeLong(keyGroupState.getStateSize());
- dos.writeLong(keyGroupState.getDuration());
- }
- }
- }
- }
-
- @Override
- public SavepointV0 deserialize(DataInputStream dis) throws IOException {
- long checkpointId = dis.readLong();
-
- // Task states
- int numTaskStates = dis.readInt();
- List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
- for (int i = 0; i < numTaskStates; i++) {
- JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
- int parallelism = dis.readInt();
-
- // Add task state
- TaskState taskState = new TaskState(jobVertexId, parallelism);
- taskStates.add(taskState);
-
- // Sub task states
- int numSubTaskStates = dis.readInt();
- for (int j = 0; j < numSubTaskStates; j++) {
- int subtaskIndex = dis.readInt();
-
- int length = dis.readInt();
-
- SerializedValue<StateHandle<?>> serializedValue;
- if (length == -1) {
- serializedValue = new SerializedValue<>(null);
- } else {
- byte[] serializedData = new byte[length];
- dis.readFully(serializedData, 0, length);
- serializedValue = SerializedValue.fromBytes(serializedData);
- }
-
- long stateSize = dis.readLong();
- long duration = dis.readLong();
-
- SubtaskState subtaskState = new SubtaskState(
- serializedValue,
- stateSize,
- duration);
-
- taskState.putState(subtaskIndex, subtaskState);
- }
-
- // Key group states
- int numKvStates = dis.readInt();
- for (int j = 0; j < numKvStates; j++) {
- int keyGroupIndex = dis.readInt();
-
- int length = dis.readInt();
-
- SerializedValue<StateHandle<?>> serializedValue;
- if (length == -1) {
- serializedValue = new SerializedValue<>(null);
- } else {
- byte[] serializedData = new byte[length];
- dis.readFully(serializedData, 0, length);
- serializedValue = SerializedValue.fromBytes(serializedData);
- }
-
- long stateSize = dis.readLong();
- long duration = dis.readLong();
-
- KeyGroupState keyGroupState = new KeyGroupState(
- serializedValue,
- stateSize,
- duration);
-
- taskState.putKvState(keyGroupIndex, keyGroupState);
- }
- }
-
- return new SavepointV0(checkpointId, taskStates);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
new file mode 100644
index 0000000..5976bbf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+/**
+ * Savepoint version 0.
+ *
+ * <p>This format was introduced with Flink 1.1.0.
+ */
+public class SavepointV1 implements Savepoint {
+
+ /** The savepoint version. */
+ public static final int VERSION = 1;
+
+ /** The checkpoint ID */
+ private final long checkpointId;
+
+ /** The task states */
+ private final Collection<TaskState> taskStates;
+
+ public SavepointV1(long checkpointId, Collection<TaskState> taskStates) {
+ this.checkpointId = checkpointId;
+ this.taskStates = Preconditions.checkNotNull(taskStates, "Task States");
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
+ @Override
+ public Collection<TaskState> getTaskStates() {
+ return taskStates;
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ for (TaskState taskState : taskStates) {
+ taskState.discardState();
+ }
+ taskStates.clear();
+ }
+
+ @Override
+ public String toString() {
+ return "Savepoint(version=" + VERSION + ")";
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SavepointV1 that = (SavepointV1) o;
+ return checkpointId == that.checkpointId && getTaskStates().equals(that.getTaskStates());
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (checkpointId ^ (checkpointId >>> 32));
+ result = 31 * result + taskStates.hashCode();
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
new file mode 100644
index 0000000..8e05b81
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Serializer for {@link SavepointV1} instances.
+ * <p>
+ * <p>In contrast to previous savepoint versions, this serializer makes sure
+ * that no default Java serialization is used for serialization. Therefore, we
+ * don't rely on any involved Java classes to stay the same.
+ */
+class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
+
+ private static final byte NULL_HANDLE = 0;
+ private static final byte BYTE_STREAM_STATE_HANDLE = 1;
+ private static final byte FILE_STREAM_STATE_HANDLE = 2;
+ private static final byte KEY_GROUPS_HANDLE = 3;
+
+
+ public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
+
+ private SavepointV1Serializer() {
+ }
+
+ @Override
+ public void serialize(SavepointV1 savepoint, DataOutputStream dos) throws IOException {
+ try {
+ dos.writeLong(savepoint.getCheckpointId());
+
+ Collection<TaskState> taskStates = savepoint.getTaskStates();
+ dos.writeInt(taskStates.size());
+
+ for (TaskState taskState : savepoint.getTaskStates()) {
+ // Vertex ID
+ dos.writeLong(taskState.getJobVertexID().getLowerPart());
+ dos.writeLong(taskState.getJobVertexID().getUpperPart());
+
+ // Parallelism
+ int parallelism = taskState.getParallelism();
+ dos.writeInt(parallelism);
+ dos.writeInt(taskState.getMaxParallelism());
+
+ // Sub task states
+ Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
+ dos.writeInt(subtaskStateMap.size());
+ for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
+ dos.writeInt(entry.getKey());
+
+ SubtaskState subtaskState = entry.getValue();
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle = subtaskState.getChainedStateHandle();
+ dos.writeInt(chainedStateHandle.getLength());
+ for (int j = 0; j < chainedStateHandle.getLength(); ++j) {
+ StreamStateHandle stateHandle = chainedStateHandle.get(j);
+ serializeStreamStateHandle(stateHandle, dos);
+ }
+
+ dos.writeLong(subtaskState.getDuration());
+ }
+
+
+ Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles = taskState.getKeyGroupsStateHandles();
+ dos.writeInt(keyGroupsStateHandles.size());
+ for (Map.Entry<Integer, KeyGroupsStateHandle> entry : keyGroupsStateHandles.entrySet()) {
+ dos.writeInt(entry.getKey());
+ serializeKeyGroupStateHandle(entry.getValue(), dos);
+ }
+
+ }
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public SavepointV1 deserialize(DataInputStream dis) throws IOException {
+ long checkpointId = dis.readLong();
+
+ // Task states
+ int numTaskStates = dis.readInt();
+ List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+ for (int i = 0; i < numTaskStates; i++) {
+ JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
+ int parallelism = dis.readInt();
+ int maxParallelism = dis.readInt();
+
+ // Add task state
+ TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism);
+ taskStates.add(taskState);
+
+ // Sub task states
+ int numSubTaskStates = dis.readInt();
+
+ for (int j = 0; j < numSubTaskStates; j++) {
+ int subtaskIndex = dis.readInt();
+ int chainedStateHandleSize = dis.readInt();
+ List<StreamStateHandle> streamStateHandleList = new ArrayList<>(chainedStateHandleSize);
+ for (int k = 0; k < chainedStateHandleSize; ++k) {
+ StreamStateHandle streamStateHandle = deserializeStreamStateHandle(dis);
+ streamStateHandleList.add(streamStateHandle);
+ }
+
+ long duration = dis.readLong();
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<>(streamStateHandleList);
+ SubtaskState subtaskState = new SubtaskState(chainedStateHandle, duration);
+ taskState.putState(subtaskIndex, subtaskState);
+ }
+
+ // Key group states
+ int numKeyGroupStates = dis.readInt();
+ for (int j = 0; j < numKeyGroupStates; j++) {
+ int keyGroupIndex = dis.readInt();
+
+ KeyGroupsStateHandle keyGroupsStateHandle = deserializeKeyGroupStateHandle(dis);
+ if (keyGroupsStateHandle != null) {
+ taskState.putKeyedState(keyGroupIndex, keyGroupsStateHandle);
+ }
+ }
+ }
+
+ return new SavepointV1(checkpointId, taskStates);
+ }
+
+ public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+ if (stateHandle != null) {
+ dos.writeByte(KEY_GROUPS_HANDLE);
+ dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
+ dos.writeInt(stateHandle.getNumberOfKeyGroups());
+ for (int keyGroup : stateHandle.keyGroups()) {
+ dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
+ }
+ serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+ } else {
+ dos.writeByte(NULL_HANDLE);
+ }
+ }
+
+ public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
+ int type = dis.readByte();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else {
+ int startKeyGroup = dis.readInt();
+ int numKeyGroups = dis.readInt();
+ KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
+ long[] offsets = new long[numKeyGroups];
+ for (int i = 0; i < numKeyGroups; ++i) {
+ offsets[i] = dis.readLong();
+ }
+ KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
+ StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+ return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ }
+ }
+
+ public static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle == null) {
+ dos.writeByte(NULL_HANDLE);
+
+ } else if (stateHandle instanceof FileStateHandle) {
+ dos.writeByte(FILE_STREAM_STATE_HANDLE);
+ FileStateHandle fileStateHandle = (FileStateHandle) stateHandle;
+ dos.writeUTF(fileStateHandle.getFilePath().toString());
+
+ } else if (stateHandle instanceof ByteStreamStateHandle) {
+ dos.writeByte(BYTE_STREAM_STATE_HANDLE);
+ ByteStreamStateHandle byteStreamStateHandle = (ByteStreamStateHandle) stateHandle;
+ byte[] internalData = byteStreamStateHandle.getData();
+ dos.writeInt(internalData.length);
+ dos.write(byteStreamStateHandle.getData());
+
+ } else {
+ throw new IOException("Unknown implementation of StreamStateHandle: " + stateHandle.getClass());
+ }
+
+ dos.flush();
+ }
+
+ public static StreamStateHandle deserializeStreamStateHandle(DataInputStream dis) throws IOException {
+ int type = dis.read();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (FILE_STREAM_STATE_HANDLE == type) {
+ String pathString = dis.readUTF();
+ return new FileStateHandle(new Path(pathString));
+ } else if (BYTE_STREAM_STATE_HANDLE == type) {
+ int numBytes = dis.readInt();
+ byte[] data = new byte[numBytes];
+ dis.read(data);
+ return new ByteStreamStateHandle(data);
+ } else {
+ throw new IOException("Unknown implementation of StreamStateHandle, code: " + type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 9d47457..2217fd4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -22,8 +22,8 @@ import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import scala.Option;
@@ -140,7 +140,12 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
}
synchronized (statsLock) {
- long overallStateSize = checkpoint.getStateSize();
+ long overallStateSize;
+ try {
+ overallStateSize = checkpoint.getStateSize();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
// Operator stats
Map<JobVertexID, long[][]> statsForSubTasks = new HashMap<>();
@@ -421,7 +426,11 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker {
private class CheckpointSizeGauge implements Gauge<Long> {
@Override
public Long getValue() {
- return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
+ try {
+ return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getStateSize();
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 60fb45c..8849e93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -25,7 +25,9 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.SerializedValue;
import java.io.Serializable;
@@ -33,6 +35,7 @@ import java.net.URL;
import java.util.Collection;
import java.util.List;
+
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +91,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** The list of classpaths required to run this task. */
private final List<URL> requiredClasspaths;
- private final SerializedValue<StateHandle<?>> operatorState;
+ /** Handle to the non-partitioned state of the operator chain */
+ private final ChainedStateHandle<StreamStateHandle> operatorState;
+
+ /** Handle to the key-grouped state of the head operator in the chain */
+ private final List<KeyGroupsStateHandle> keyGroupState;
/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
@@ -114,7 +121,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
List<BlobKey> requiredJarFiles,
List<URL> requiredClasspaths,
int targetSlotNumber,
- SerializedValue<StateHandle<?>> operatorState) {
+ ChainedStateHandle<StreamStateHandle> operatorState,
+ List<KeyGroupsStateHandle> keyGroupState) {
checkArgument(indexInSubtaskGroup >= 0);
checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -139,6 +147,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
this.requiredClasspaths = checkNotNull(requiredClasspaths);
this.targetSlotNumber = targetSlotNumber;
this.operatorState = operatorState;
+ this.keyGroupState = keyGroupState;
}
public TaskDeploymentDescriptor(
@@ -178,6 +187,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
requiredJarFiles,
requiredClasspaths,
targetSlotNumber,
+ null,
null);
}
@@ -316,7 +326,11 @@ public final class TaskDeploymentDescriptor implements Serializable {
return strBuilder.toString();
}
- public SerializedValue<StateHandle<?>> getOperatorState() {
+ public ChainedStateHandle<StreamStateHandle> getOperatorState() {
return operatorState;
}
+
+ public List<KeyGroupsStateHandle> getKeyGroupState() {
+ return keyGroupState;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 2f158fd..1eee9d4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -35,9 +35,12 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -166,14 +169,18 @@ public interface Environment {
void acknowledgeCheckpoint(long checkpointId);
/**
- * Confirms that the invokable has successfully completed all steps it needed to
- * to for the checkpoint with the give checkpoint-ID. This method does include
+ * Confirms that the invokable has successfully completed all required steps for
+ * the checkpoint with the give checkpoint-ID. This method does include
* the given state in the checkpoint.
*
* @param checkpointId The ID of the checkpoint.
- * @param state A handle to the state to be included in the checkpoint.
+ * @param chainedStateHandle Handle for the chained operator state
+ * @param keyGroupStateHandles Handles for key group state
*/
- void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state);
+ void acknowledgeCheckpoint(
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupStateHandles);
/**
* Marks task execution failed for an external reason (a reason other than the task code itself
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 5bab780..1981f5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -46,9 +46,10 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -134,8 +135,10 @@ public class Execution {
private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
- /** The state with which the execution attempt should start */
- private SerializedValue<StateHandle<?>> operatorState;
+ private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
+
+ private List<KeyGroupsStateHandle> keyGroupsStateHandles;
+
/** The execution context which is used to execute futures. */
private ExecutionContext executionContext;
@@ -215,6 +218,14 @@ public class Execution {
return this.stateTimestamps[state.ordinal()];
}
+ public ChainedStateHandle<StreamStateHandle> getChainedStateHandle() {
+ return chainedStateHandle;
+ }
+
+ public List<KeyGroupsStateHandle> getKeyGroupsStateHandles() {
+ return keyGroupsStateHandles;
+ }
+
public boolean isFinished() {
return state.isTerminal();
}
@@ -234,19 +245,22 @@ public class Execution {
partialInputChannelDeploymentDescriptors = null;
}
+ /**
+ * Sets the initial state for the execution. The serialized state is then shipped via the
+ * {@link TaskDeploymentDescriptor} to the TaskManagers.
+ *
+ * @param chainedStateHandle Chained operator state
+ * @param keyGroupsStateHandles Key-group state (= partitioned state)
+ */
public void setInitialState(
- SerializedValue<StateHandle<?>> initialState,
- Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
-
- if (initialKvState != null && initialKvState.size() > 0) {
- throw new UnsupportedOperationException("Error: inconsistent handling of key/value state snapshots");
- }
+ ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+ List<KeyGroupsStateHandle> keyGroupsStateHandles) {
if (state != ExecutionState.CREATED) {
throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
}
-
- this.operatorState = initialState;
+ this.chainedStateHandle = chainedStateHandle;
+ this.keyGroupsStateHandles = keyGroupsStateHandles;
}
// --------------------------------------------------------------------------------------------
@@ -373,7 +387,8 @@ public class Execution {
final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
attemptId,
slot,
- operatorState,
+ chainedStateHandle,
+ keyGroupsStateHandles,
attemptNumber);
// register this execution at the execution graph, to receive call backs
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index b15f851..f3a8b6d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -42,9 +42,11 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.slf4j.Logger;
import scala.concurrent.duration.FiniteDuration;
@@ -183,14 +185,14 @@ public class ExecutionVertex {
return this.jobVertex.getParallelism();
}
- public int getParallelSubtaskIndex() {
- return this.subTaskIndex;
- }
-
public int getMaxParallelism() {
return this.jobVertex.getMaxParallelism();
}
+ public int getParallelSubtaskIndex() {
+ return this.subTaskIndex;
+ }
+
public int getNumberOfInputs() {
return this.inputEdges.length;
}
@@ -229,7 +231,7 @@ public class ExecutionVertex {
public InstanceConnectionInfo getCurrentAssignedResourceLocation() {
return currentExecution.getAssignedResourceLocation();
}
-
+
public Execution getPriorExecutionAttempt(int attemptNumber) {
if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
return priorExecutions.get(attemptNumber);
@@ -238,7 +240,7 @@ public class ExecutionVertex {
throw new IllegalArgumentException("attempt does not exist");
}
}
-
+
public ExecutionGraph getExecutionGraph() {
return this.jobVertex.getGraph();
}
@@ -275,7 +277,7 @@ public class ExecutionVertex {
this.inputEdges[inputNumber] = edges;
// add the consumers to the source
- // for now (until the receiver initiated handshake is in place), we need to register the
+ // for now (until the receiver initiated handshake is in place), we need to register the
// edges as the execution graph
for (ExecutionEdge ee : edges) {
ee.getSource().addConsumer(ee, consumerNumber);
@@ -486,11 +488,11 @@ public class ExecutionVertex {
ExecutionAttemptID attemptID,
ActorGateway sender) {
Execution exec = getCurrentExecutionAttempt();
-
+
// check that this is for the correct execution attempt
if (exec != null && exec.getAttemptId().equals(attemptID)) {
SimpleSlot slot = exec.getAssignedResource();
-
+
// send only if we actually have a target
if (slot != null) {
ActorGateway gateway = slot.getInstance().getActorGateway();
@@ -517,7 +519,7 @@ public class ExecutionVertex {
return false;
}
}
-
+
/**
* Schedules or updates the consumer tasks of the result partition with the given ID.
*/
@@ -633,13 +635,14 @@ public class ExecutionVertex {
/**
* Creates a task deployment descriptor to deploy a subtask to the given target slot.
- *
+ *
* TODO: This should actually be in the EXECUTION
*/
TaskDeploymentDescriptor createDeploymentDescriptor(
ExecutionAttemptID executionId,
SimpleSlot targetSlot,
- SerializedValue<StateHandle<?>> operatorState,
+ ChainedStateHandle<StreamStateHandle> operatorState,
+ List<KeyGroupsStateHandle> keyGroupStates,
int attemptNumber) {
// Produced intermediate results
@@ -690,7 +693,8 @@ public class ExecutionVertex {
jarFiles,
classpaths,
targetSlot.getRoot().getSlotNumber(),
- operatorState);
+ operatorState,
+ keyGroupStates);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
index 4786388..a623295 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java
@@ -249,7 +249,7 @@ public class JobVertex implements java.io.Serializable {
/**
* Sets the maximum parallelism for the task.
*
- * @param maxParallelism The maximum parallelism to be set.
+ * @param maxParallelism The maximum parallelism to be set. must be between 1 and Short.MAX_VALUE.
*/
public void setMaxParallelism(int maxParallelism) {
org.apache.flink.util.Preconditions.checkArgument(
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index f8bba1a..cab7ed6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -18,21 +18,27 @@
package org.apache.flink.runtime.jobgraph.tasks;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+
+import java.util.List;
/**
* This interface must be implemented by any invokable that has recoverable state and participates
* in checkpointing.
*/
-public interface StatefulTask<T extends StateHandle<?>> {
+public interface StatefulTask {
/**
* Sets the initial state of the operator, upon recovery. The initial state is typically
* a snapshot of the state from a previous execution.
*
- * @param stateHandle The handle to the state.
+ * @param chainedState Handle for the chained operator states.
+ * @param keyGroupsState Handle for key group states.
*/
- void setInitialState(T stateHandle) throws Exception;
+ void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
/**
* This method is either called directly and asynchronously by the checkpoint
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
index 7f7c5fe..ec05f1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphStore.java
@@ -25,8 +25,8 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -98,7 +98,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
public ZooKeeperSubmittedJobGraphStore(
CuratorFramework client,
String currentJobsPath,
- StateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
+ RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage) throws Exception {
checkNotNull(currentJobsPath, "Current jobs path");
checkNotNull(stateStorage, "State storage");
@@ -153,7 +153,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
synchronized (cacheLock) {
verifyIsRunning();
- List<Tuple2<StateHandle<SubmittedJobGraph>, String>> submitted;
+ List<Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String>> submitted;
while (true) {
try {
@@ -168,10 +168,8 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
if (submitted.size() != 0) {
List<SubmittedJobGraph> jobGraphs = new ArrayList<>(submitted.size());
- for (Tuple2<StateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
- SubmittedJobGraph jobGraph = jobStateHandle
- .f0.getState(ClassLoader.getSystemClassLoader());
-
+ for (Tuple2<RetrievableStateHandle<SubmittedJobGraph>, String> jobStateHandle : submitted) {
+ SubmittedJobGraph jobGraph = jobStateHandle.f0.retrieveState();
addedJobGraphs.add(jobGraph.getJobId());
jobGraphs.add(jobGraph);
@@ -196,11 +194,7 @@ public class ZooKeeperSubmittedJobGraphStore implements SubmittedJobGraphStore {
verifyIsRunning();
try {
- StateHandle<SubmittedJobGraph> jobStateHandle = jobGraphsInZooKeeper.get(path);
-
- SubmittedJobGraph jobGraph = jobStateHandle
- .getState(ClassLoader.getSystemClassLoader());
-
+ SubmittedJobGraph jobGraph = jobGraphsInZooKeeper.get(path).retrieveState();
addedJobGraphs.add(jobGraph.getJobId());
LOG.info("Recovered {}.", jobGraph);
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index a4d438b..0c56603 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -20,51 +20,50 @@ package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.util.List;
/**
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
* {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
* individual task is completed.
*
- * This message may carry the handle to the task's state.
+ * <p>This message may carry the handle to the task's chained operator state and the key group
+ * state.
*/
public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
private static final long serialVersionUID = -7606214777192401493L;
- private final SerializedValue<StateHandle<?>> state;
+ private final ChainedStateHandle<StreamStateHandle> stateHandle;
- /**
- * The state size. This is an optimization in order to not deserialize the
- * state handle at the checkpoint coordinator when gathering stats about
- * the checkpoints.
- */
- private final long stateSize;
+ private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
- this(job, taskExecutionId, checkpointId, null, 0);
+ this(job, taskExecutionId, checkpointId, null, null);
}
public AcknowledgeCheckpoint(
- JobID job,
- ExecutionAttemptID taskExecutionId,
- long checkpointId,
- SerializedValue<StateHandle<?>> state,
- long stateSize) {
+ JobID job,
+ ExecutionAttemptID taskExecutionId,
+ long checkpointId,
+ ChainedStateHandle<StreamStateHandle> state,
+ List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
super(job, taskExecutionId, checkpointId);
- this.state = state;
- this.stateSize = stateSize;
+ this.stateHandle = state;
+ this.keyGroupsStateHandle = keyGroupStateAndSizes;
}
- public SerializedValue<StateHandle<?>> getState() {
- return state;
+ public ChainedStateHandle<StreamStateHandle> getStateHandle() {
+ return stateHandle;
}
- public long getStateSize() {
- return stateSize;
+ public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
+ return keyGroupsStateHandle;
}
// --------------------------------------------------------------------------------------------
@@ -76,8 +75,10 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
}
else if (o instanceof AcknowledgeCheckpoint) {
AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
- return super.equals(o) && (this.state == null ? that.state == null :
- (that.state != null && this.state.equals(that.state)));
+ return super.equals(o) &&
+ (this.stateHandle == null ? that.stateHandle == null : (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
+ (this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
+
}
else {
return false;
@@ -86,7 +87,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
@Override
public String toString() {
- return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
- getCheckpointId(), getJob(), getTaskExecutionId(), state);
+ return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s keyGroupState=%s",
+ getCheckpointId(), getJob(), getTaskExecutionId(), stateHandle, keyGroupsStateHandle);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
index 609158d..5966c95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
@@ -20,27 +20,26 @@ package org.apache.flink.runtime.state;
import java.io.Closeable;
import java.io.IOException;
-import java.io.Serializable;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/**
* A simple base for closable handles.
- *
+ *
* Offers to register a stream (or other closable object) that close calls are delegated to if
* the handle is closed or was already closed.
*/
-public abstract class AbstractCloseableHandle implements Closeable, Serializable {
+public abstract class AbstractCloseableHandle implements Closeable, StateObject {
/** Serial Version UID must be constant to maintain format compatibility */
private static final long serialVersionUID = 1L;
/** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
- private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
+ private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
// ------------------------------------------------------------------------
- /** The closeable to close if this handle is closed late */
+ /** The closeable to close if this handle is closed late */
private transient volatile Closeable toClose;
/** Flag to remember if this handle was already closed */
@@ -53,7 +52,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
if (toClose == null) {
return;
}
-
+
// NOTE: The order of operations matters here:
// (1) first setting the closeable
// (2) checking the flag.
@@ -73,16 +72,16 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
/**
* Closes the handle.
- *
+ *
* <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
* then this will be closes.
- *
+ *
* <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
* it will immediately be closed and that method will throw an exception.
- *
+ *
* @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
* are forwarded.
- *
+ *
* @see #registerCloseable(Closeable)
*/
@Override
@@ -106,7 +105,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
/**
* Checks whether this handle has been closed.
- *
+ *
* @return True is the handle is closed, false otherwise.
*/
public boolean isClosed() {
@@ -116,7 +115,7 @@ public abstract class AbstractCloseableHandle implements Closeable, Serializable
/**
* This method checks whether the handle is closed and throws an exception if it is closed.
* If the handle is not closed, this method does nothing.
- *
+ *
* @throws IOException Thrown, if the handle has been closed.
*/
public void ensureNotClosed() throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 6fc9475..b2cde22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -33,17 +33,12 @@ import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -122,7 +117,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
*/
public abstract void close() throws Exception;
- public void dispose() {
+ public void discardState() throws Exception {
if (kvStateRegistry != null) {
kvStateRegistry.unregisterAll();
}
@@ -418,37 +413,6 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
public abstract CheckpointStateOutputStream createCheckpointStateOutputStream(
long checkpointID, long timestamp) throws Exception;
- /**
- * Creates a {@link DataOutputView} stream that writes into the state of the given checkpoint.
- * When the stream is closes, it returns a state handle that can retrieve the state back.
- *
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @return An DataOutputView stream that writes state for the given checkpoint.
- *
- * @throws Exception Exceptions may occur while creating the stream and should be forwarded.
- */
- public CheckpointStateOutputView createCheckpointStateOutputView(
- long checkpointID, long timestamp) throws Exception {
- return new CheckpointStateOutputView(createCheckpointStateOutputStream(checkpointID, timestamp));
- }
-
- /**
- * Writes the given state into the checkpoint, and returns a handle that can retrieve the state back.
- *
- * @param state The state to be checkpointed.
- * @param checkpointID The ID of the checkpoint.
- * @param timestamp The timestamp of the checkpoint.
- * @param <S> The type of the state.
- *
- * @return A state handle that can retrieve the checkpoined state.
- *
- * @throws Exception Exceptions may occur during serialization / storing the state and should be forwarded.
- */
- public abstract <S extends Serializable> StateHandle<S> checkpointStateSerializable(
- S state, long checkpointID, long timestamp) throws Exception;
-
-
// ------------------------------------------------------------------------
// Checkpoint state output stream
// ------------------------------------------------------------------------
@@ -456,7 +420,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
/**
* A dedicated output stream that produces a {@link StreamStateHandle} when closed.
*/
- public static abstract class CheckpointStateOutputStream extends OutputStream {
+ public static abstract class CheckpointStateOutputStream extends FSDataOutputStream {
/**
* Closes the stream and gets a state handle that can create an input stream
@@ -467,67 +431,4 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
*/
public abstract StreamStateHandle closeAndGetHandle() throws IOException;
}
-
- /**
- * A dedicated DataOutputView stream that produces a {@code StateHandle<DataInputView>} when closed.
- */
- public static final class CheckpointStateOutputView extends DataOutputViewStreamWrapper {
-
- private final CheckpointStateOutputStream out;
-
- public CheckpointStateOutputView(CheckpointStateOutputStream out) {
- super(out);
- this.out = out;
- }
-
- /**
- * Closes the stream and gets a state handle that can create a DataInputView.
- * producing the data written to this stream.
- *
- * @return A state handle that can create an input stream producing the data written to this stream.
- * @throws IOException Thrown, if the stream cannot be closed.
- */
- public StateHandle<DataInputView> closeAndGetHandle() throws IOException {
- return new DataInputViewHandle(out.closeAndGetHandle());
- }
-
- @Override
- public void close() throws IOException {
- out.close();
- }
- }
-
- /**
- * Simple state handle that resolved a {@link DataInputView} from a StreamStateHandle.
- */
- private static final class DataInputViewHandle implements StateHandle<DataInputView> {
-
- private static final long serialVersionUID = 2891559813513532079L;
-
- private final StreamStateHandle stream;
-
- private DataInputViewHandle(StreamStateHandle stream) {
- this.stream = stream;
- }
-
- @Override
- public DataInputView getState(ClassLoader userCodeClassLoader) throws Exception {
- return new DataInputViewStreamWrapper(stream.getState(userCodeClassLoader));
- }
-
- @Override
- public void discardState() throws Exception {
- stream.discardState();
- }
-
- @Override
- public long getStateSize() throws Exception {
- return stream.getStateSize();
- }
-
- @Override
- public void close() throws IOException {
- stream.close();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
deleted file mode 100644
index fee1efe..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousStateHandle.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-/**
- * {@link StateHandle} that can asynchronously materialize the state that it represents. Instead
- * of representing a materialized handle to state this would normally hold the (immutable) state
- * internally and can materialize it if requested.
- */
-public abstract class AsynchronousStateHandle<T> implements StateHandle<T> {
- private static final long serialVersionUID = 1L;
-
- /**
- * Materializes the state held by this {@code AsynchronousStateHandle}.
- */
- public abstract StateHandle<T> materialize() throws Exception;
-
- @Override
- public final T getState(ClassLoader userCodeClassLoader) throws Exception {
- throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
- }
-
- @Override
- public final void discardState() throws Exception {
- throw new UnsupportedOperationException("This must not be called. This is likely an internal bug.");
- }
-}