You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/30 12:47:57 UTC
[07/10] flink git commit: [FLINK-4379] [checkpoints] Introduce
rescalable operator state
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index f5e3618..7e4eded 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.checkpoint;
import com.google.common.collect.Iterables;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.util.Preconditions;
@@ -47,27 +49,36 @@ public class TaskState implements StateObject {
/** handles to non-partitioned states, subtaskindex -> subtaskstate */
private final Map<Integer, SubtaskState> subtaskStates;
- /** handles to partitioned states, subtaskindex -> keyed state */
+ /** handles to partitionable states, subtaskindex -> partitionable state */
+ private final Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStates;
+
+ /** handles to key-partitioned states, subtaskindex -> keyed state */
private final Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles;
+
/** parallelism of the operator when it was checkpointed */
private final int parallelism;
/** maximum parallelism of the operator when the job was first created */
private final int maxParallelism;
- public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism) {
+ private final int chainLength;
+
+ public TaskState(JobVertexID jobVertexID, int parallelism, int maxParallelism, int chainLength) {
Preconditions.checkArgument(
parallelism <= maxParallelism,
"Parallelism " + parallelism + " is not smaller or equal to max parallelism " + maxParallelism + ".");
+ Preconditions.checkArgument(chainLength > 0, "There has to be at least one operator in the operator chain.");
this.jobVertexID = jobVertexID;
this.subtaskStates = new HashMap<>(parallelism);
+ this.partitionableStates = new HashMap<>(parallelism);
this.keyGroupsStateHandles = new HashMap<>(parallelism);
this.parallelism = parallelism;
this.maxParallelism = maxParallelism;
+ this.chainLength = chainLength;
}
public JobVertexID getJobVertexID() {
@@ -85,6 +96,20 @@ public class TaskState implements StateObject {
}
}
+ public void putPartitionableState(
+ int subtaskIndex,
+ ChainedStateHandle<OperatorStateHandle> partitionableState) {
+
+ Preconditions.checkNotNull(partitionableState);
+
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ } else {
+ partitionableStates.put(subtaskIndex, partitionableState);
+ }
+ }
+
public void putKeyedState(int subtaskIndex, KeyGroupsStateHandle keyGroupsStateHandle) {
Preconditions.checkNotNull(keyGroupsStateHandle);
@@ -106,6 +131,15 @@ public class TaskState implements StateObject {
}
}
+ public ChainedStateHandle<OperatorStateHandle> getPartitionableState(int subtaskIndex) {
+ if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
+ throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
+ " exceeds the maximum number of sub tasks " + subtaskStates.size());
+ } else {
+ return partitionableStates.get(subtaskIndex);
+ }
+ }
+
public KeyGroupsStateHandle getKeyGroupState(int subtaskIndex) {
if (subtaskIndex < 0 || subtaskIndex >= parallelism) {
throw new IndexOutOfBoundsException("The given sub task index " + subtaskIndex +
@@ -131,6 +165,10 @@ public class TaskState implements StateObject {
return maxParallelism;
}
+ public int getChainLength() {
+ return chainLength;
+ }
+
public Collection<KeyGroupsStateHandle> getKeyGroupStates() {
return keyGroupsStateHandles.values();
}
@@ -147,7 +185,7 @@ public class TaskState implements StateObject {
@Override
public void discardState() throws Exception {
StateUtil.bestEffortDiscardAllStateObjects(
- Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+ Iterables.concat(subtaskStates.values(), partitionableStates.values(), keyGroupsStateHandles.values()));
}
@@ -156,11 +194,19 @@ public class TaskState implements StateObject {
long result = 0L;
for (int i = 0; i < parallelism; i++) {
- if (subtaskStates.get(i) != null) {
- result += subtaskStates.get(i).getStateSize();
+ SubtaskState subtaskState = subtaskStates.get(i);
+ if (subtaskState != null) {
+ result += subtaskState.getStateSize();
+ }
+
+ ChainedStateHandle<OperatorStateHandle> partitionableState = partitionableStates.get(i);
+ if (partitionableState != null) {
+ result += partitionableState.getStateSize();
}
- if (keyGroupsStateHandles.get(i) != null) {
- result += keyGroupsStateHandles.get(i).getStateSize();
+
+ KeyGroupsStateHandle keyGroupsState = keyGroupsStateHandles.get(i);
+ if (keyGroupsState != null) {
+ result += keyGroupsState.getStateSize();
}
}
@@ -172,8 +218,11 @@ public class TaskState implements StateObject {
if (obj instanceof TaskState) {
TaskState other = (TaskState) obj;
- return jobVertexID.equals(other.jobVertexID) && parallelism == other.parallelism &&
- subtaskStates.equals(other.subtaskStates) && keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
+ return jobVertexID.equals(other.jobVertexID)
+ && parallelism == other.parallelism
+ && subtaskStates.equals(other.subtaskStates)
+ && partitionableStates.equals(other.partitionableStates)
+ && keyGroupsStateHandles.equals(other.keyGroupsStateHandles);
} else {
return false;
}
@@ -181,13 +230,7 @@ public class TaskState implements StateObject {
@Override
public int hashCode() {
- return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, keyGroupsStateHandles);
- }
-
- @Override
- public void close() throws IOException {
- StateUtil.bestEffortCloseAllStateObjects(
- Iterables.concat(subtaskStates.values(), keyGroupsStateHandles.values()));
+ return parallelism + 31 * Objects.hash(jobVertexID, subtaskStates, partitionableStates, keyGroupsStateHandles);
}
public Map<Integer, SubtaskState> getSubtaskStates() {
@@ -197,4 +240,8 @@ public class TaskState implements StateObject {
public Map<Integer, KeyGroupsStateHandle> getKeyGroupsStateHandles() {
return Collections.unmodifiableMap(keyGroupsStateHandles);
}
+
+ public Map<Integer, ChainedStateHandle<OperatorStateHandle>> getPartitionableStates() {
+ return partitionableStates;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index f07f44f..536062a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
@@ -35,6 +36,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -51,6 +53,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
private static final byte BYTE_STREAM_STATE_HANDLE = 1;
private static final byte FILE_STREAM_STATE_HANDLE = 2;
private static final byte KEY_GROUPS_HANDLE = 3;
+ private static final byte PARTITIONABLE_OPERATOR_STATE_HANDLE = 4;
public static final SavepointV1Serializer INSTANCE = new SavepointV1Serializer();
@@ -75,8 +78,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
int parallelism = taskState.getParallelism();
dos.writeInt(parallelism);
dos.writeInt(taskState.getMaxParallelism());
+ dos.writeInt(taskState.getChainLength());
- // Sub task states
+ // Sub task non-partitionable states
Map<Integer, SubtaskState> subtaskStateMap = taskState.getSubtaskStates();
dos.writeInt(subtaskStateMap.size());
for (Map.Entry<Integer, SubtaskState> entry : subtaskStateMap.entrySet()) {
@@ -93,7 +97,22 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
dos.writeLong(subtaskState.getDuration());
}
+ // Sub task partitionable states
+ Map<Integer, ChainedStateHandle<OperatorStateHandle>> partitionableStatesMap = taskState.getPartitionableStates();
+ dos.writeInt(partitionableStatesMap.size());
+ for (Map.Entry<Integer, ChainedStateHandle<OperatorStateHandle>> entry : partitionableStatesMap.entrySet()) {
+ dos.writeInt(entry.getKey());
+
+ ChainedStateHandle<OperatorStateHandle> chainedStateHandle = entry.getValue();
+ dos.writeInt(chainedStateHandle.getLength());
+ for (int j = 0; j < chainedStateHandle.getLength(); ++j) {
+ OperatorStateHandle stateHandle = chainedStateHandle.get(j);
+ serializePartitionableStateHandle(stateHandle, dos);
+ }
+ }
+
+ // Keyed state
Map<Integer, KeyGroupsStateHandle> keyGroupsStateHandles = taskState.getKeyGroupsStateHandles();
dos.writeInt(keyGroupsStateHandles.size());
for (Map.Entry<Integer, KeyGroupsStateHandle> entry : keyGroupsStateHandles.entrySet()) {
@@ -119,9 +138,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
JobVertexID jobVertexId = new JobVertexID(dis.readLong(), dis.readLong());
int parallelism = dis.readInt();
int maxParallelism = dis.readInt();
+ int chainLength = dis.readInt();
// Add task state
- TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism);
+ TaskState taskState = new TaskState(jobVertexId, parallelism, maxParallelism, chainLength);
taskStates.add(taskState);
// Sub task states
@@ -142,6 +162,24 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
taskState.putState(subtaskIndex, subtaskState);
}
+ int numPartitionableOpStates = dis.readInt();
+
+ for (int j = 0; j < numPartitionableOpStates; j++) {
+ int subtaskIndex = dis.readInt();
+ int chainedStateHandleSize = dis.readInt();
+ List<OperatorStateHandle> streamStateHandleList = new ArrayList<>(chainedStateHandleSize);
+
+ for (int k = 0; k < chainedStateHandleSize; ++k) {
+ OperatorStateHandle streamStateHandle = deserializePartitionableStateHandle(dis);
+ streamStateHandleList.add(streamStateHandle);
+ }
+
+ ChainedStateHandle<OperatorStateHandle> chainedStateHandle =
+ new ChainedStateHandle<>(streamStateHandleList);
+
+ taskState.putPartitionableState(subtaskIndex, chainedStateHandle);
+ }
+
// Key group states
int numKeyGroupStates = dis.readInt();
for (int j = 0; j < numKeyGroupStates; j++) {
@@ -157,7 +195,9 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
return new SavepointV1(checkpointId, taskStates);
}
- public static void serializeKeyGroupStateHandle(KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+ public static void serializeKeyGroupStateHandle(
+ KeyGroupsStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
if (stateHandle != null) {
dos.writeByte(KEY_GROUPS_HANDLE);
dos.writeInt(stateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup());
@@ -172,10 +212,10 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
}
public static KeyGroupsStateHandle deserializeKeyGroupStateHandle(DataInputStream dis) throws IOException {
- int type = dis.readByte();
+ final int type = dis.readByte();
if (NULL_HANDLE == type) {
return null;
- } else {
+ } else if (KEY_GROUPS_HANDLE == type) {
int startKeyGroup = dis.readInt();
int numKeyGroups = dis.readInt();
KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, startKeyGroup + numKeyGroups - 1);
@@ -186,6 +226,53 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(keyGroupRange, offsets);
StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
+ } else {
+ throw new IllegalStateException("Reading invalid KeyGroupsStateHandle, type: " + type);
+ }
+ }
+
+ public static void serializePartitionableStateHandle(
+ OperatorStateHandle stateHandle, DataOutputStream dos) throws IOException {
+
+ if (stateHandle != null) {
+ dos.writeByte(PARTITIONABLE_OPERATOR_STATE_HANDLE);
+ Map<String, long[]> partitionOffsetsMap = stateHandle.getStateNameToPartitionOffsets();
+ dos.writeInt(partitionOffsetsMap.size());
+ for (Map.Entry<String, long[]> entry : partitionOffsetsMap.entrySet()) {
+ dos.writeUTF(entry.getKey());
+ long[] offsets = entry.getValue();
+ dos.writeInt(offsets.length);
+ for (int i = 0; i < offsets.length; ++i) {
+ dos.writeLong(offsets[i]);
+ }
+ }
+ serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
+ } else {
+ dos.writeByte(NULL_HANDLE);
+ }
+ }
+
+ public static OperatorStateHandle deserializePartitionableStateHandle(
+ DataInputStream dis) throws IOException {
+
+ final int type = dis.readByte();
+ if (NULL_HANDLE == type) {
+ return null;
+ } else if (PARTITIONABLE_OPERATOR_STATE_HANDLE == type) {
+ int mapSize = dis.readInt();
+ Map<String, long[]> offsetsMap = new HashMap<>(mapSize);
+ for (int i = 0; i < mapSize; ++i) {
+ String key = dis.readUTF();
+ long[] offsets = new long[dis.readInt()];
+ for (int j = 0; j < offsets.length; ++j) {
+ offsets[j] = dis.readLong();
+ }
+ offsetsMap.put(key, offsets);
+ }
+ StreamStateHandle stateHandle = deserializeStreamStateHandle(dis);
+ return new OperatorStateHandle(stateHandle, offsetsMap);
+ } else {
+ throw new IllegalStateException("Reading invalid OperatorStateHandle, type: " + type);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index ca976e4..7bbdb2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.SerializedValue;
@@ -100,6 +101,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
/** Handle to the key-grouped state of the head operator in the chain */
private final List<KeyGroupsStateHandle> keyGroupState;
+ private final List<Collection<OperatorStateHandle>> partitionableOperatorState;
+
/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
@@ -107,26 +110,27 @@ public final class TaskDeploymentDescriptor implements Serializable {
* Constructs a task deployment descriptor.
*/
public TaskDeploymentDescriptor(
- JobID jobID,
- String jobName,
- JobVertexID vertexID,
- ExecutionAttemptID executionId,
- SerializedValue<ExecutionConfig> serializedExecutionConfig,
- String taskName,
- int numberOfKeyGroups,
- int indexInSubtaskGroup,
- int numberOfSubtasks,
- int attemptNumber,
- Configuration jobConfiguration,
- Configuration taskConfiguration,
- String invokableClassName,
- List<ResultPartitionDeploymentDescriptor> producedPartitions,
- List<InputGateDeploymentDescriptor> inputGates,
- List<BlobKey> requiredJarFiles,
- List<URL> requiredClasspaths,
- int targetSlotNumber,
- ChainedStateHandle<StreamStateHandle> operatorState,
- List<KeyGroupsStateHandle> keyGroupState) {
+ JobID jobID,
+ String jobName,
+ JobVertexID vertexID,
+ ExecutionAttemptID executionId,
+ SerializedValue<ExecutionConfig> serializedExecutionConfig,
+ String taskName,
+ int numberOfKeyGroups,
+ int indexInSubtaskGroup,
+ int numberOfSubtasks,
+ int attemptNumber,
+ Configuration jobConfiguration,
+ Configuration taskConfiguration,
+ String invokableClassName,
+ List<ResultPartitionDeploymentDescriptor> producedPartitions,
+ List<InputGateDeploymentDescriptor> inputGates,
+ List<BlobKey> requiredJarFiles,
+ List<URL> requiredClasspaths,
+ int targetSlotNumber,
+ ChainedStateHandle<StreamStateHandle> operatorState,
+ List<KeyGroupsStateHandle> keyGroupState,
+ List<Collection<OperatorStateHandle>> partitionableOperatorStateHandles) {
checkArgument(indexInSubtaskGroup >= 0);
checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -153,6 +157,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
this.targetSlotNumber = targetSlotNumber;
this.operatorState = operatorState;
this.keyGroupState = keyGroupState;
+ this.partitionableOperatorState = partitionableOperatorStateHandles;
}
public TaskDeploymentDescriptor(
@@ -195,6 +200,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
requiredClasspaths,
targetSlotNumber,
null,
+ null,
null);
}
@@ -347,4 +353,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
public List<KeyGroupsStateHandle> getKeyGroupState() {
return keyGroupState;
}
+
+ public List<Collection<OperatorStateHandle>> getPartitionableOperatorState() {
+ return partitionableOperatorState;
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 273c0d9..f6cde95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -34,13 +34,10 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
@@ -187,12 +184,8 @@ public interface Environment {
* the checkpoint with the give checkpoint-ID. This method does include
* the given state in the checkpoint.
*
- * @param checkpointId
- * The ID of the checkpoint.
- * @param chainedStateHandle
- * Handle for the chained operator state
- * @param keyGroupStateHandles
- * Handles for key group state
+ * @param checkpointId The ID of the checkpoint.
+ * @param checkpointStateHandles All state handles for the checkpointed state
* @param synchronousDurationMillis
* The duration (in milliseconds) of the synchronous part of the operator checkpoint
* @param asynchronousDurationMillis
@@ -204,8 +197,7 @@ public interface Environment {
*/
void acknowledgeCheckpoint(
long checkpointId,
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupStateHandles,
+ CheckpointStateHandles checkpointStateHandles,
long synchronousDurationMillis,
long asynchronousDurationMillis,
long bytesBufferedInAlignment,
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 912ff10..b92e3af 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SlotProvider;
@@ -46,6 +47,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -56,6 +58,7 @@ import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
@@ -134,6 +137,8 @@ public class Execution {
private ChainedStateHandle<StreamStateHandle> chainedStateHandle;
+ private List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle;
+
private List<KeyGroupsStateHandle> keyGroupsStateHandles;
@@ -223,6 +228,10 @@ public class Execution {
return keyGroupsStateHandles;
}
+ public List<Collection<OperatorStateHandle>> getChainedPartitionableStateHandle() {
+ return chainedPartitionableStateHandle;
+ }
+
public boolean isFinished() {
return state.isTerminal();
}
@@ -246,18 +255,19 @@ public class Execution {
* Sets the initial state for the execution. The serialized state is then shipped via the
* {@link TaskDeploymentDescriptor} to the TaskManagers.
*
- * @param chainedStateHandle Chained operator state
- * @param keyGroupsStateHandles Key-group state (= partitioned state)
+ * @param checkpointStateHandles all checkpointed operator state
*/
- public void setInitialState(
- ChainedStateHandle<StreamStateHandle> chainedStateHandle,
- List<KeyGroupsStateHandle> keyGroupsStateHandles) {
+ public void setInitialState(CheckpointStateHandles checkpointStateHandles, List<Collection<OperatorStateHandle>> chainedPartitionableStateHandle) {
if (state != ExecutionState.CREATED) {
throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
}
- this.chainedStateHandle = chainedStateHandle;
- this.keyGroupsStateHandles = keyGroupsStateHandles;
+
+ if(checkpointStateHandles != null) {
+ this.chainedStateHandle = checkpointStateHandles.getNonPartitionedStateHandles();
+ this.chainedPartitionableStateHandle = chainedPartitionableStateHandle;
+ this.keyGroupsStateHandles = checkpointStateHandles.getKeyGroupsStateHandle();
+ }
}
// --------------------------------------------------------------------------------------------
@@ -385,6 +395,7 @@ public class Execution {
slot,
chainedStateHandle,
keyGroupsStateHandles,
+ chainedPartitionableStateHandle,
attemptNumber);
// register this execution at the execution graph, to receive call backs
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c3fa0b..6023205 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -56,10 +56,8 @@ import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.runtime.util.SerializedThrowable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.SerializedValue;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a8d5ee4..4837803 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
@@ -53,6 +54,7 @@ import scala.concurrent.duration.FiniteDuration;
import java.io.Serializable;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
@@ -629,6 +631,7 @@ public class ExecutionVertex {
SimpleSlot targetSlot,
ChainedStateHandle<StreamStateHandle> operatorState,
List<KeyGroupsStateHandle> keyGroupStates,
+ List<Collection<OperatorStateHandle>> partitionableOperatorStateHandle,
int attemptNumber) {
// Produced intermediate results
@@ -681,7 +684,8 @@ public class ExecutionVertex {
classpaths,
targetSlot.getRoot().getSlotNumber(),
operatorState,
- keyGroupStates);
+ keyGroupStates,
+ partitionableOperatorStateHandle);
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 9ddfdf7..55e3e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,8 +20,10 @@ package org.apache.flink.runtime.jobgraph.tasks;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
+import java.util.Collection;
import java.util.List;
/**
@@ -33,11 +35,16 @@ public interface StatefulTask {
/**
* Sets the initial state of the operator, upon recovery. The initial state is typically
* a snapshot of the state from a previous execution.
- *
+ *
+ * TODO this should use @{@link org.apache.flink.runtime.state.CheckpointStateHandles} after redoing chained state.
+ *
* @param chainedState Handle for the chained operator states.
* @param keyGroupsState Handle for key group states.
*/
- void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
+ void setInitialState(
+ ChainedStateHandle<StreamStateHandle> chainedState,
+ List<KeyGroupsStateHandle> keyGroupsState,
+ List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception;
/**
* This method is called to trigger a checkpoint, asynchronously by the checkpoint
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 72396eb..e95e7b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -20,11 +20,7 @@ package org.apache.flink.runtime.messages.checkpoint;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.util.List;
+import org.apache.flink.runtime.state.CheckpointStateHandles;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -32,7 +28,7 @@ import static org.apache.flink.util.Preconditions.checkArgument;
* This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
* {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
* individual task is completed.
- *
+ * <p>
* <p>This message may carry the handle to the task's chained operator state and the key group
* state.
*/
@@ -40,9 +36,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
private static final long serialVersionUID = -7606214777192401493L;
- private final ChainedStateHandle<StreamStateHandle> stateHandle;
- private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+ private final CheckpointStateHandles checkpointStateHandles;
/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
private final long synchronousDurationMillis;
@@ -62,24 +57,22 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
JobID job,
ExecutionAttemptID taskExecutionId,
long checkpointId) {
- this(job, taskExecutionId, checkpointId, null, null);
+ this(job, taskExecutionId, checkpointId, null);
}
public AcknowledgeCheckpoint(
JobID job,
ExecutionAttemptID taskExecutionId,
long checkpointId,
- ChainedStateHandle<StreamStateHandle> state,
- List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
- this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+ CheckpointStateHandles checkpointStateHandles) {
+ this(job, taskExecutionId, checkpointId, checkpointStateHandles, -1L, -1L, -1L, -1L);
}
public AcknowledgeCheckpoint(
JobID job,
ExecutionAttemptID taskExecutionId,
long checkpointId,
- ChainedStateHandle<StreamStateHandle> state,
- List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+ CheckpointStateHandles checkpointStateHandles,
long synchronousDurationMillis,
long asynchronousDurationMillis,
long bytesBufferedInAlignment,
@@ -87,9 +80,7 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
super(job, taskExecutionId, checkpointId);
- // these may be null in cases where the operator has no state
- this.stateHandle = state;
- this.keyGroupsStateHandle = keyGroupStateAndSizes;
+ this.checkpointStateHandles = checkpointStateHandles;
// these may be "-1", in case the values are unknown or not set
checkArgument(synchronousDurationMillis >= -1);
@@ -107,12 +98,8 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
// properties
// ------------------------------------------------------------------------
- public ChainedStateHandle<StreamStateHandle> getStateHandle() {
- return stateHandle;
- }
-
- public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
- return keyGroupsStateHandle;
+ public CheckpointStateHandles getCheckpointStateHandles() {
+ return checkpointStateHandles;
}
public long getSynchronousDurationMillis() {
@@ -134,31 +121,33 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
// --------------------------------------------------------------------------------------------
@Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
public boolean equals(Object o) {
if (this == o) {
- return true ;
+ return true;
}
- else if (o instanceof AcknowledgeCheckpoint) {
- AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
- return super.equals(o) &&
- (this.stateHandle == null ? that.stateHandle == null :
- (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
- (this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null :
- (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
+ if (!(o instanceof AcknowledgeCheckpoint)) {
+ return false;
}
- else {
+ if (!super.equals(o)) {
return false;
}
+
+ AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
+ return checkpointStateHandles != null ?
+ checkpointStateHandles.equals(that.checkpointStateHandles) : that.checkpointStateHandles == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = super.hashCode();
+ result = 31 * result + (checkpointStateHandles != null ? checkpointStateHandles.hashCode() : 0);
+ return result;
}
@Override
public String toString() {
- return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s keyGroupState=%s",
- getCheckpointId(), getJob(), getTaskExecutionId(), stateHandle, keyGroupsStateHandle);
+ return String.format("Confirm Task Checkpoint %d for (%s/%s) - state=%s",
+ getCheckpointId(), getJob(), getTaskExecutionId(), checkpointStateHandles);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
deleted file mode 100644
index 5966c95..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractCloseableHandle.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-/**
- * A simple base for closable handles.
- *
- * Offers to register a stream (or other closable object) that close calls are delegated to if
- * the handle is closed or was already closed.
- */
-public abstract class AbstractCloseableHandle implements Closeable, StateObject {
-
- /** Serial Version UID must be constant to maintain format compatibility */
- private static final long serialVersionUID = 1L;
-
- /** To atomically update the "closable" field without needing to add a member class like "AtomicBoolean */
- private static final AtomicIntegerFieldUpdater<AbstractCloseableHandle> CLOSER =
- AtomicIntegerFieldUpdater.newUpdater(AbstractCloseableHandle.class, "isClosed");
-
- // ------------------------------------------------------------------------
-
- /** The closeable to close if this handle is closed late */
- private transient volatile Closeable toClose;
-
- /** Flag to remember if this handle was already closed */
- @SuppressWarnings("unused") // this field is actually updated, but via the "CLOSER" updater
- private transient volatile int isClosed;
-
- // ------------------------------------------------------------------------
-
- protected final void registerCloseable(Closeable toClose) throws IOException {
- if (toClose == null) {
- return;
- }
-
- // NOTE: The order of operations matters here:
- // (1) first setting the closeable
- // (2) checking the flag.
- // Because the order in the {@link #close()} method is the opposite, and
- // both variables are volatile (reordering barriers), we can be sure that
- // one of the methods always notices the effect of a concurrent call to the
- // other method.
-
- this.toClose = toClose;
-
- // check if we were closed early
- if (this.isClosed != 0) {
- toClose.close();
- throw new IOException("handle is closed");
- }
- }
-
- /**
- * Closes the handle.
- *
- * <p>If a "Closeable" has been registered via {@link #registerCloseable(Closeable)},
- * then this will be closes.
- *
- * <p>If any "Closeable" will be registered via {@link #registerCloseable(Closeable)} in the future,
- * it will immediately be closed and that method will throw an exception.
- *
- * @throws IOException Exceptions occurring while closing an already registered {@code Closeable}
- * are forwarded.
- *
- * @see #registerCloseable(Closeable)
- */
- @Override
- public final void close() throws IOException {
- // NOTE: The order of operations matters here:
- // (1) first setting the closed flag
- // (2) checking whether there is already a closeable
- // Because the order in the {@link #registerCloseable(Closeable)} method is the opposite, and
- // both variables are volatile (reordering barriers), we can be sure that
- // one of the methods always notices the effect of a concurrent call to the
- // other method.
-
- if (CLOSER.compareAndSet(this, 0, 1)) {
- final Closeable toClose = this.toClose;
- if (toClose != null) {
- this.toClose = null;
- toClose.close();
- }
- }
- }
-
- /**
- * Checks whether this handle has been closed.
- *
- * @return True is the handle is closed, false otherwise.
- */
- public boolean isClosed() {
- return isClosed != 0;
- }
-
- /**
- * This method checks whether the handle is closed and throws an exception if it is closed.
- * If the handle is not closed, this method does nothing.
- *
- * @throws IOException Thrown, if the handle has been closed.
- */
- public void ensureNotClosed() throws IOException {
- if (isClosed != 0) {
- throw new IOException("handle is closed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
new file mode 100644
index 0000000..7ca3b38
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateBackend;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+
+/**
+ * Base implementation of KeyedStateBackend. The state can be checkpointed
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ *
+ * @param <K> Type of the key by which state is keyed.
+ */
+public abstract class AbstractKeyedStateBackend<K>
+ implements KeyedStateBackend<K>, SnapshotProvider<KeyGroupsStateHandle>, Closeable {
+
+ /** {@link TypeSerializer} for our key. */
+ protected final TypeSerializer<K> keySerializer;
+
+ /** The currently active key. */
+ protected K currentKey;
+
+ /** The key group of the currently active key */
+ private int currentKeyGroup;
+
+ /** So that we can give out state when the user uses the same key. */
+ protected HashMap<String, KvState<?>> keyValueStatesByName;
+
+ /** For caching the last accessed partitioned state */
+ private String lastName;
+
+ @SuppressWarnings("rawtypes")
+ private KvState lastState;
+
+ /** The number of key-groups aka max parallelism */
+ protected final int numberOfKeyGroups;
+
+ /** Range of key-groups for which this backend is responsible */
+ protected final KeyGroupRange keyGroupRange;
+
+ /** KvStateRegistry helper for this task */
+ protected final TaskKvStateRegistry kvStateRegistry;
+
+ /** Registry for all opened streams, so they can be closed if the task using this backend is closed */
+ protected ClosableRegistry cancelStreamRegistry;
+
+ protected final ClassLoader userCodeClassLoader;
+
+ public AbstractKeyedStateBackend(
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ ClassLoader userCodeClassLoader,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange) {
+
+ this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+ this.keySerializer = Preconditions.checkNotNull(keySerializer);
+ this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+ this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
+ this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+ this.cancelStreamRegistry = new ClosableRegistry();
+ }
+
+ /**
+ * Closes the state backend, releasing all internal resources, but does not delete any persistent
+ * checkpoint data.
+ *
+ */
+ @Override
+ public void dispose() {
+ if (kvStateRegistry != null) {
+ kvStateRegistry.unregisterAll();
+ }
+
+ lastName = null;
+ lastState = null;
+ keyValueStatesByName = null;
+ }
+
+ /**
+ * Creates and returns a new {@link ValueState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the value that the {@code ValueState} can store.
+ */
+ protected abstract <N, T> ValueState<T> createValueState(TypeSerializer<N> namespaceSerializer, ValueStateDescriptor<T> stateDesc) throws Exception;
+
+ /**
+ * Creates and returns a new {@link ListState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that the {@code ListState} can store.
+ */
+ protected abstract <N, T> ListState<T> createListState(TypeSerializer<N> namespaceSerializer, ListStateDescriptor<T> stateDesc) throws Exception;
+
+ /**
+ * Creates and returns a new {@link ReducingState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <T> The type of the values that the {@code ListState} can store.
+ */
+ protected abstract <N, T> ReducingState<T> createReducingState(TypeSerializer<N> namespaceSerializer, ReducingStateDescriptor<T> stateDesc) throws Exception;
+
+ /**
+ * Creates and returns a new {@link FoldingState}.
+ *
+ * @param namespaceSerializer TypeSerializer for the state namespace.
+ * @param stateDesc The {@code StateDescriptor} that contains the name of the state.
+ *
+ * @param <N> The type of the namespace.
+ * @param <T> Type of the values folded into the state
+ * @param <ACC> Type of the value in the state *
+ */
+ protected abstract <N, T, ACC> FoldingState<T, ACC> createFoldingState(TypeSerializer<N> namespaceSerializer, FoldingStateDescriptor<T, ACC> stateDesc) throws Exception;
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ public void setCurrentKey(K newKey) {
+ this.currentKey = newKey;
+ this.currentKeyGroup = KeyGroupRangeAssignment.assignToKeyGroup(newKey, numberOfKeyGroups);
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ public TypeSerializer<K> getKeySerializer() {
+ return keySerializer;
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ public K getCurrentKey() {
+ return currentKey;
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ public int getCurrentKeyGroupIndex() {
+ return currentKeyGroup;
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ public int getNumberOfKeyGroups() {
+ return numberOfKeyGroups;
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ public KeyGroupRange getKeyGroupRange() {
+ return keyGroupRange;
+ }
+
+ /**
+ * @see KeyedStateBackend
+ */
+ @Override
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+ Preconditions.checkNotNull(namespace, "Namespace");
+ Preconditions.checkNotNull(namespaceSerializer, "Namespace serializer");
+
+ if (keySerializer == null) {
+ throw new RuntimeException("State key serializer has not been configured in the config. " +
+ "This operation cannot use partitioned state.");
+ }
+
+ if (!stateDescriptor.isSerializerInitialized()) {
+ stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
+ }
+
+ if (keyValueStatesByName == null) {
+ keyValueStatesByName = new HashMap<>();
+ }
+
+ if (lastName != null && lastName.equals(stateDescriptor.getName())) {
+ lastState.setCurrentNamespace(namespace);
+ return (S) lastState;
+ }
+
+ KvState<?> previous = keyValueStatesByName.get(stateDescriptor.getName());
+ if (previous != null) {
+ lastState = previous;
+ lastState.setCurrentNamespace(namespace);
+ lastName = stateDescriptor.getName();
+ return (S) previous;
+ }
+
+ // create a new blank key/value state
+ S state = stateDescriptor.bind(new StateBackend() {
+ @Override
+ public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createValueState(namespaceSerializer, stateDesc);
+ }
+
+ @Override
+ public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createListState(namespaceSerializer, stateDesc);
+ }
+
+ @Override
+ public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createReducingState(namespaceSerializer, stateDesc);
+ }
+
+ @Override
+ public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+ return AbstractKeyedStateBackend.this.createFoldingState(namespaceSerializer, stateDesc);
+ }
+
+ });
+
+ KvState kvState = (KvState) state;
+
+ keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+
+ lastName = stateDescriptor.getName();
+ lastState = kvState;
+
+ kvState.setCurrentNamespace(namespace);
+
+ // Publish queryable state
+ if (stateDescriptor.isQueryable()) {
+ if (kvStateRegistry == null) {
+ throw new IllegalStateException("State backend has not been initialized for job.");
+ }
+
+ String name = stateDescriptor.getQueryableStateName();
+ kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
+ }
+
+ return state;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked,rawtypes")
+ public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+ if (stateDescriptor instanceof ReducingStateDescriptor) {
+ ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+ ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
+ ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+ KvState kvState = (KvState) state;
+ Object result = null;
+ for (N source: sources) {
+ kvState.setCurrentNamespace(source);
+ Object sourceValue = state.get();
+ if (result == null) {
+ result = state.get();
+ } else if (sourceValue != null) {
+ result = reduceFn.reduce(result, sourceValue);
+ }
+ state.clear();
+ }
+ kvState.setCurrentNamespace(target);
+ if (result != null) {
+ state.add(result);
+ }
+ } else if (stateDescriptor instanceof ListStateDescriptor) {
+ ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+ KvState kvState = (KvState) state;
+ List<Object> result = new ArrayList<>();
+ for (N source: sources) {
+ kvState.setCurrentNamespace(source);
+ Iterable<Object> sourceValue = state.get();
+ if (sourceValue != null) {
+ for (Object o : sourceValue) {
+ result.add(o);
+ }
+ }
+ state.clear();
+ }
+ kvState.setCurrentNamespace(target);
+ for (Object o : result) {
+ state.add(o);
+ }
+ } else {
+ throw new RuntimeException("Cannot merge states for " + stateDescriptor);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ cancelStreamRegistry.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 0d2bf45..c2e665b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
/**
@@ -36,31 +37,33 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
* Creates a {@link CheckpointStreamFactory} that can be used to create streams
* that should end up in a checkpoint.
*
- * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+ * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
* @param operatorIdentifier An identifier of the operator for which we create streams.
*/
public abstract CheckpointStreamFactory createStreamFactory(
JobID jobId,
- String operatorIdentifier) throws IOException;
+ String operatorIdentifier
+ ) throws IOException;
/**
- * Creates a new {@link KeyedStateBackend} that is responsible for keeping keyed state
+ * Creates a new {@link AbstractKeyedStateBackend} that is responsible for keeping keyed state
* and can be checkpointed to checkpoint streams.
*/
- public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
+ public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception;
+ TaskKvStateRegistry kvStateRegistry
+ ) throws Exception;
/**
- * Creates a new {@link KeyedStateBackend} that restores its state from the given list
+ * Creates a new {@link AbstractKeyedStateBackend} that restores its state from the given list
* {@link KeyGroupsStateHandle KeyGroupStateHandles}.
*/
- public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
+ public abstract <K> AbstractKeyedStateBackend<K> restoreKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
@@ -68,6 +71,30 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
List<KeyGroupsStateHandle> restoredState,
- TaskKvStateRegistry kvStateRegistry) throws Exception;
+ TaskKvStateRegistry kvStateRegistry
+ ) throws Exception;
+
+ /**
+ * Creates a new {@link OperatorStateBackend} that can be used for storing partitionable operator
+ * state in checkpoint streams.
+ */
+ public OperatorStateBackend createOperatorStateBackend(
+ Environment env,
+ String operatorIdentifier
+ ) throws Exception {
+ return new DefaultOperatorStateBackend();
+ }
+
+ /**
+ * Creates a new {@link OperatorStateBackend} that restores its state from the given collection of
+ * {@link OperatorStateHandle}.
+ */
+ public OperatorStateBackend restoreOperatorStateBackend(
+ Environment env,
+ String operatorIdentifier,
+ Collection<OperatorStateHandle> restoreSnapshots
+ ) throws Exception {
+ return new DefaultOperatorStateBackend(restoreSnapshots);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 74057ee..c6904c0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -26,7 +26,7 @@ import java.util.Collections;
import java.util.List;
/**
- * Handle to the non-partitioned states for the operators in an operator chain.
+ * Handle to state handles for the operators in an operator chain.
*/
public class ChainedStateHandle<T extends StateObject> implements StateObject {
@@ -123,9 +123,4 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject {
public static <T extends StateObject> ChainedStateHandle<T> wrapSingleHandle(T stateHandleToWrap) {
return new ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap));
}
-
- @Override
- public void close() throws IOException {
- StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles);
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
new file mode 100644
index 0000000..9daf963
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStateHandles.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Container state handles that contains all state handles from the different state types of a checkpointed state.
+ * TODO This will be changed in the future if we get rid of chained state and instead connect state directly to individual operators in a chain.
+ */
+public class CheckpointStateHandles implements Serializable {
+
+ private static final long serialVersionUID = 3252351989995L;
+
+ private final ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles;
+
+ private final ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles;
+
+ private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
+
+ public CheckpointStateHandles(
+ ChainedStateHandle<StreamStateHandle> nonPartitionedStateHandles,
+ ChainedStateHandle<OperatorStateHandle> partitioneableStateHandles,
+ List<KeyGroupsStateHandle> keyGroupsStateHandle) {
+
+ this.nonPartitionedStateHandles = nonPartitionedStateHandles;
+ this.partitioneableStateHandles = partitioneableStateHandles;
+ this.keyGroupsStateHandle = keyGroupsStateHandle;
+ }
+
+ public ChainedStateHandle<StreamStateHandle> getNonPartitionedStateHandles() {
+ return nonPartitionedStateHandles;
+ }
+
+ public ChainedStateHandle<OperatorStateHandle> getPartitioneableStateHandles() {
+ return partitioneableStateHandles;
+ }
+
+ public List<KeyGroupsStateHandle> getKeyGroupsStateHandle() {
+ return keyGroupsStateHandle;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof CheckpointStateHandles)) {
+ return false;
+ }
+
+ CheckpointStateHandles that = (CheckpointStateHandles) o;
+
+ if (nonPartitionedStateHandles != null ?
+ !nonPartitionedStateHandles.equals(that.nonPartitionedStateHandles)
+ : that.nonPartitionedStateHandles != null) {
+ return false;
+ }
+
+ if (partitioneableStateHandles != null ?
+ !partitioneableStateHandles.equals(that.partitioneableStateHandles)
+ : that.partitioneableStateHandles != null) {
+ return false;
+ }
+ return keyGroupsStateHandle != null ?
+ keyGroupsStateHandle.equals(that.keyGroupsStateHandle) : that.keyGroupsStateHandle == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = nonPartitionedStateHandles != null ? nonPartitionedStateHandles.hashCode() : 0;
+ result = 31 * result + (partitioneableStateHandles != null ? partitioneableStateHandles.hashCode() : 0);
+ result = 31 * result + (keyGroupsStateHandle != null ? keyGroupsStateHandle.hashCode() : 0);
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CheckpointStateHandles{" +
+ "nonPartitionedStateHandles=" + nonPartitionedStateHandles +
+ ", partitioneableStateHandles=" + partitioneableStateHandles +
+ ", keyGroupsStateHandle=" + keyGroupsStateHandle +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
new file mode 100644
index 0000000..26d6192
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ClosableRegistry.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.IOUtils;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+public class ClosableRegistry implements Closeable {
+
+ private final Set<Closeable> registeredCloseables;
+ private boolean closed;
+
+ public ClosableRegistry() {
+ this.registeredCloseables = new HashSet<>();
+ this.closed = false;
+ }
+
+ public boolean registerClosable(Closeable closeable) {
+
+ if (null == closeable) {
+ return false;
+ }
+
+ synchronized (getSynchronizationLock()) {
+ if (closed) {
+ throw new IllegalStateException("Cannot register Closable, registry is already closed.");
+ }
+
+ return registeredCloseables.add(closeable);
+ }
+ }
+
+ public boolean unregisterClosable(Closeable closeable) {
+
+ if (null == closeable) {
+ return false;
+ }
+
+ synchronized (getSynchronizationLock()) {
+ return registeredCloseables.remove(closeable);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+
+ if (!registeredCloseables.isEmpty()) {
+
+ synchronized (getSynchronizationLock()) {
+
+ for (Closeable closeable : registeredCloseables) {
+ IOUtils.closeQuietly(closeable);
+ }
+
+ registeredCloseables.clear();
+ closed = true;
+ }
+ }
+ }
+
+ private Object getSynchronizationLock() {
+ return registeredCloseables;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
new file mode 100644
index 0000000..0bd5eeb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Default implementation of OperatorStateStore that provides the ability to make snapshots.
+ */
+public class DefaultOperatorStateBackend implements OperatorStateBackend {
+
+ private final Map<String, PartitionableListState<?>> registeredStates;
+ private final Collection<OperatorStateHandle> restoreSnapshots;
+ private final ClosableRegistry closeStreamOnCancelRegistry;
+
+ /**
+ * Restores a OperatorStateStore (lazily) using the provided snapshots.
+ *
+ * @param restoreSnapshots snapshots that are available to restore partitionable states on request.
+ */
+ public DefaultOperatorStateBackend(
+ Collection<OperatorStateHandle> restoreSnapshots) {
+ this.restoreSnapshots = restoreSnapshots;
+ this.registeredStates = new HashMap<>();
+ this.closeStreamOnCancelRegistry = new ClosableRegistry();
+ }
+
+ /**
+ * Creates an empty OperatorStateStore.
+ */
+ public DefaultOperatorStateBackend() {
+ this(null);
+ }
+
+ /**
+ * @see OperatorStateStore
+ */
+ @Override
+ public <S> ListState<S> getPartitionableState(
+ ListStateDescriptor<S> stateDescriptor) throws IOException {
+
+ Preconditions.checkNotNull(stateDescriptor);
+
+ String name = Preconditions.checkNotNull(stateDescriptor.getName());
+ TypeSerializer<S> partitionStateSerializer = Preconditions.checkNotNull(stateDescriptor.getSerializer());
+
+ @SuppressWarnings("unchecked")
+ PartitionableListState<S> partitionableListState = (PartitionableListState<S>) registeredStates.get(name);
+
+ if (null == partitionableListState) {
+
+ partitionableListState = new PartitionableListState<>(partitionStateSerializer);
+
+ registeredStates.put(name, partitionableListState);
+
+ // Try to restore previous state if state handles to snapshots are provided
+ if (restoreSnapshots != null) {
+ for (OperatorStateHandle stateHandle : restoreSnapshots) {
+
+ long[] offsets = stateHandle.getStateNameToPartitionOffsets().get(name);
+
+ if (offsets != null) {
+
+ FSDataInputStream in = stateHandle.openInputStream();
+ try {
+ closeStreamOnCancelRegistry.registerClosable(in);
+
+ DataInputView div = new DataInputViewStreamWrapper(in);
+
+ for (int i = 0; i < offsets.length; ++i) {
+
+ in.seek(offsets[i]);
+ S partitionState = partitionStateSerializer.deserialize(div);
+ partitionableListState.add(partitionState);
+ }
+ } finally {
+ closeStreamOnCancelRegistry.unregisterClosable(in);
+ in.close();
+ }
+ }
+ }
+ }
+ }
+
+ return partitionableListState;
+ }
+
+ /**
+ * @see SnapshotProvider
+ */
+ @Override
+ public RunnableFuture<OperatorStateHandle> snapshot(
+ long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+
+ if (registeredStates.isEmpty()) {
+ return new DoneFuture<>(null);
+ }
+
+ Map<String, long[]> writtenStatesMetaData = new HashMap<>(registeredStates.size());
+
+ CheckpointStreamFactory.CheckpointStateOutputStream out = streamFactory.
+ createCheckpointStateOutputStream(checkpointId, timestamp);
+
+ try {
+ closeStreamOnCancelRegistry.registerClosable(out);
+
+ DataOutputView dov = new DataOutputViewStreamWrapper(out);
+
+ dov.writeInt(registeredStates.size());
+ for (Map.Entry<String, PartitionableListState<?>> entry : registeredStates.entrySet()) {
+
+ long[] partitionOffsets = entry.getValue().write(out);
+ writtenStatesMetaData.put(entry.getKey(), partitionOffsets);
+ }
+
+ OperatorStateHandle handle = new OperatorStateHandle(out.closeAndGetHandle(), writtenStatesMetaData);
+
+ return new DoneFuture<>(handle);
+ } finally {
+ closeStreamOnCancelRegistry.unregisterClosable(out);
+ out.close();
+ }
+ }
+
+ @Override
+ public void dispose() {
+
+ }
+
+ static final class PartitionableListState<S> implements ListState<S> {
+
+ private final List<S> listState;
+ private final TypeSerializer<S> partitionStateSerializer;
+
+ public PartitionableListState(TypeSerializer<S> partitionStateSerializer) {
+ this.listState = new ArrayList<>();
+ this.partitionStateSerializer = Preconditions.checkNotNull(partitionStateSerializer);
+ }
+
+ @Override
+ public void clear() {
+ listState.clear();
+ }
+
+ @Override
+ public Iterable<S> get() {
+ return listState;
+ }
+
+ @Override
+ public void add(S value) {
+ listState.add(value);
+ }
+
+ public long[] write(FSDataOutputStream out) throws IOException {
+
+ long[] partitionOffsets = new long[listState.size()];
+
+ DataOutputView dov = new DataOutputViewStreamWrapper(out);
+
+ for (int i = 0; i < listState.size(); ++i) {
+ S element = listState.get(i);
+ partitionOffsets[i] = out.getPos();
+ partitionStateSerializer.serialize(element, dov);
+ }
+
+ return partitionOffsets;
+ }
+ }
+
+ @Override
+ public Set<String> getRegisteredStateNames() {
+ return registeredStates.keySet();
+ }
+
+ @Override
+ public void close() throws IOException {
+ closeStreamOnCancelRegistry.close();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
index 4f0a82b..8e7207e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
@@ -31,6 +31,8 @@ import java.util.Iterator;
*/
public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , Serializable {
+ private static final long serialVersionUID = 6595415219136429696L;
+
/** the range of key-groups */
private final KeyGroupRange keyGroupRange;
http://git-wip-us.apache.org/repos/asf/flink/blob/53ed6ada/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 7f87e86..ea12808 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -138,7 +138,6 @@ public class KeyGroupsStateHandle implements StateObject {
return false;
}
return stateHandle.equals(that.stateHandle);
-
}
@Override
@@ -155,9 +154,4 @@ public class KeyGroupsStateHandle implements StateObject {
", data=" + stateHandle +
'}';
}
-
- @Override
- public void close() throws IOException {
- stateHandle.close();
- }
}