You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:11 UTC
[4/9] flink git commit: [hotfix] [runtime] Various code cleanups and
reductions of warnings in heap state restoring code
[hotfix] [runtime] Various code cleanups and reductions of warnings in heap state restoring code
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8a784e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8a784e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8a784e9
Branch: refs/heads/master
Commit: b8a784e93811a71f525070cee8ff32230fee8fee
Parents: 3b97128
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 20 14:41:35 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:39 2017 +0100
----------------------------------------------------------------------
.../state/heap/HeapKeyedStateBackend.java | 7 ++-
.../flink/runtime/state/heap/StateTable.java | 60 +++++++++++++-------
2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 89d4f76..b05b874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -239,6 +239,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ @SuppressWarnings("deprecation")
@Override
public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
LOG.info("Initializing heap keyed state backend from snapshot.");
@@ -388,6 +389,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
return "HeapKeyedStateBackend";
}
+ @SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
@Deprecated
private void restoreOldSavepointKeyedState(
Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
@@ -447,13 +449,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateSerializer);
StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
- stateTable.getState().set(0, rawResultMap);
+ stateTable.getState()[0] = rawResultMap;
// add named state to the backend
stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
}
}
+ @SuppressWarnings("deprecation")
private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
return new RestoredState(
stateSnapshot.deserialize(),
@@ -461,6 +464,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
stateSnapshot.getStateSerializer());
}
+ @SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
//TODO register closeable to support fast cancelation?
@@ -492,6 +496,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
}
+ @SuppressWarnings("rawtypes")
static final class RestoredState {
private final Map rawResultMap;
http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 9d7232e..21265f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -22,44 +22,64 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
import org.apache.flink.runtime.state.KeyGroupRange;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
public class StateTable<K, N, ST> {
- /** Combined meta information such as name and serializers for this state */
- protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
/** Map for holding the actual state objects. */
- private final List<Map<N, Map<K, ST>>> state;
+ private final Map<N, Map<K, ST>>[] state;
- protected final KeyGroupRange keyGroupRange;
+ /** The offset to the contiguous key groups */
+ private final int keyGroupOffset;
- public StateTable(
- RegisteredBackendStateMetaInfo<N, ST> metaInfo,
- KeyGroupRange keyGroupRange) {
+ /** Combined meta information such as name and serializers for this state */
+ private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
+
+ // ------------------------------------------------------------------------
+ public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) {
this.metaInfo = metaInfo;
- this.keyGroupRange = keyGroupRange;
+ this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
- this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]);
+ @SuppressWarnings("unchecked")
+ Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()];
+ this.state = state;
}
- private int indexToOffset(int index) {
- return index - keyGroupRange.getStartKeyGroup();
+ // ------------------------------------------------------------------------
+ // access to maps
+ // ------------------------------------------------------------------------
+
+ public Map<N, Map<K, ST>>[] getState() {
+ return state;
}
public Map<N, Map<K, ST>> get(int index) {
- return keyGroupRange.contains(index) ? state.get(indexToOffset(index)) : null;
+ final int pos = indexToOffset(index);
+ if (pos >= 0 && pos < state.length) {
+ return state[pos];
+ } else {
+ return null;
+ }
}
public void set(int index, Map<N, Map<K, ST>> map) {
- if (!keyGroupRange.contains(index)) {
- throw new RuntimeException("Unexpected key group index. This indicates a bug.");
+ try {
+ state[indexToOffset(index)] = map;
+ }
+ catch (ArrayIndexOutOfBoundsException e) {
+ throw new IllegalArgumentException("Key group index out of range of key group range [" +
+ keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
}
- state.set(indexToOffset(index), map);
}
+ private int indexToOffset(int index) {
+ return index - keyGroupOffset;
+ }
+
+ // ------------------------------------------------------------------------
+ // metadata
+ // ------------------------------------------------------------------------
+
public TypeSerializer<ST> getStateSerializer() {
return metaInfo.getStateSerializer();
}
@@ -76,10 +96,6 @@ public class StateTable<K, N, ST> {
this.metaInfo = metaInfo;
}
- public List<Map<N, Map<K, ST>>> getState() {
- return state;
- }
-
// ------------------------------------------------------------------------
// for testing
// ------------------------------------------------------------------------