You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/22 22:12:11 UTC

[4/9] flink git commit: [hotfix] [runtime] Various code cleanups and reductions of warnings in heap state restoring code

[hotfix] [runtime] Various code cleanups and reductions of warnings in heap state restoring code


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b8a784e9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b8a784e9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b8a784e9

Branch: refs/heads/master
Commit: b8a784e93811a71f525070cee8ff32230fee8fee
Parents: 3b97128
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Jan 20 14:41:35 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Jan 22 21:53:39 2017 +0100

----------------------------------------------------------------------
 .../state/heap/HeapKeyedStateBackend.java       |  7 ++-
 .../flink/runtime/state/heap/StateTable.java    | 60 +++++++++++++-------
 2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 89d4f76..b05b874 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -239,6 +239,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
+	@SuppressWarnings("deprecation")
 	@Override
 	public void restore(Collection<KeyGroupsStateHandle> restoredState) throws Exception {
 		LOG.info("Initializing heap keyed state backend from snapshot.");
@@ -388,6 +389,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		return "HeapKeyedStateBackend";
 	}
 
+	@SuppressWarnings({"unchecked", "rawtypes", "deprecation"})
 	@Deprecated
 	private void restoreOldSavepointKeyedState(
 			Collection<KeyGroupsStateHandle> stateHandles) throws IOException, ClassNotFoundException {
@@ -447,13 +449,14 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 							stateSerializer);
 
 			StateTable<K, ?, ?> stateTable = new StateTable<>(registeredBackendStateMetaInfo, keyGroupRange);
-			stateTable.getState().set(0, rawResultMap);
+			stateTable.getState()[0] = rawResultMap;
 
 			// add named state to the backend
 			stateTables.put(registeredBackendStateMetaInfo.getName(), stateTable);
 		}
 	}
 
+	@SuppressWarnings("deprecation")
 	private RestoredState restoreHeapState(AbstractMemStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
 		return new RestoredState(
 				stateSnapshot.deserialize(),
@@ -461,6 +464,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				stateSnapshot.getStateSerializer());
 	}
 
+	@SuppressWarnings({"rawtypes", "unchecked", "deprecation"})
 	private RestoredState restoreFsState(AbstractFsStateSnapshot<K, ?, ?, ?, ?> stateSnapshot) throws IOException {
 		FileSystem fs = stateSnapshot.getFilePath().getFileSystem();
 		//TODO register closeable to support fast cancelation?
@@ -492,6 +496,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 	}
 
+	@SuppressWarnings("rawtypes")
 	static final class RestoredState {
 
 		private final Map rawResultMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/b8a784e9/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
index 9d7232e..21265f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/StateTable.java
@@ -22,44 +22,64 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.RegisteredBackendStateMetaInfo;
 import org.apache.flink.runtime.state.KeyGroupRange;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
 
 public class StateTable<K, N, ST> {
 
-	/** Combined meta information such as name and serializers for this state */
-	protected RegisteredBackendStateMetaInfo<N, ST> metaInfo;
-
 	/** Map for holding the actual state objects. */
-	private final List<Map<N, Map<K, ST>>> state;
+	private final Map<N, Map<K, ST>>[] state;
 
-	protected final KeyGroupRange keyGroupRange;
+	/** The offset to the contiguous key groups */
+	private final int keyGroupOffset;
 
-	public StateTable(
-			RegisteredBackendStateMetaInfo<N, ST> metaInfo,
-			KeyGroupRange keyGroupRange) {
+	/** Combined meta information such as name and serializers for this state */
+	private RegisteredBackendStateMetaInfo<N, ST> metaInfo;
+
+	// ------------------------------------------------------------------------
+	public StateTable(RegisteredBackendStateMetaInfo<N, ST> metaInfo, KeyGroupRange keyGroupRange) {
 		this.metaInfo = metaInfo;
-		this.keyGroupRange = keyGroupRange;
+		this.keyGroupOffset = keyGroupRange.getStartKeyGroup();
 
-		this.state = Arrays.asList((Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()]);
+		@SuppressWarnings("unchecked")
+		Map<N, Map<K, ST>>[] state = (Map<N, Map<K, ST>>[]) new Map[keyGroupRange.getNumberOfKeyGroups()];
+		this.state = state;
 	}
 
-	private int indexToOffset(int index) {
-		return index - keyGroupRange.getStartKeyGroup();
+	// ------------------------------------------------------------------------
+	//  access to maps
+	// ------------------------------------------------------------------------
+
+	public Map<N, Map<K, ST>>[] getState() {
+		return state;
 	}
 
 	public Map<N, Map<K, ST>> get(int index) {
-		return keyGroupRange.contains(index) ? state.get(indexToOffset(index)) : null;
+		final int pos = indexToOffset(index);
+		if (pos >= 0 && pos < state.length) {
+			return state[pos];
+		} else {
+			return null;
+		}
 	}
 
 	public void set(int index, Map<N, Map<K, ST>> map) {
-		if (!keyGroupRange.contains(index)) {
-			throw new RuntimeException("Unexpected key group index. This indicates a bug.");
+		try {
+			state[indexToOffset(index)] = map;
+		}
+		catch (ArrayIndexOutOfBoundsException e) {
+			throw new IllegalArgumentException("Key group index out of range of key group range [" +
+					keyGroupOffset + ", " + (keyGroupOffset + state.length) + ").");
 		}
-		state.set(indexToOffset(index), map);
 	}
 
+	private int indexToOffset(int index) {
+		return index - keyGroupOffset;
+	}
+
+	// ------------------------------------------------------------------------
+	//  metadata
+	// ------------------------------------------------------------------------
+	
 	public TypeSerializer<ST> getStateSerializer() {
 		return metaInfo.getStateSerializer();
 	}
@@ -76,10 +96,6 @@ public class StateTable<K, N, ST> {
 		this.metaInfo = metaInfo;
 	}
 
-	public List<Map<N, Map<K, ST>>> getState() {
-		return state;
-	}
-
 	// ------------------------------------------------------------------------
 	//  for testing
 	// ------------------------------------------------------------------------