You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:41 UTC

[2/4] flink git commit: [hotfix] Make KeyGroupsStateHandle implement StreamStateHandle

[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee14309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee14309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee14309

Branch: refs/heads/master
Commit: fee143099a5ac7220622617c79abae60786555e4
Parents: 98710ea
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:15:48 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  2 +-
 .../savepoint/SavepointV1Serializer.java        |  2 +-
 .../runtime/state/KeyGroupsStateHandle.java     | 20 +++++++++++---------
 .../state/heap/HeapKeyedStateBackend.java       |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  7 +++----
 5 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 126ebd2..7ab35c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -624,7 +624,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private void restoreKeyGroupsInStateHandle()
 				throws IOException, RocksDBException, ClassNotFoundException {
 			try {
-				currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
 				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
 				restoreKVStateMetaData();

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 536062a..f120e1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -205,7 +205,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
 			for (int keyGroup : stateHandle.keyGroups()) {
 				dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
 			}
-			serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+			serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
 		} else {
 			dos.writeByte(NULL_HANDLE);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index ea12808..1d277b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -28,7 +29,7 @@ import java.io.IOException;
  * consists of a range of key group snapshots. A key group is subset of the available
  * key space. The key groups are identified by their key group indices.
  */
-public class KeyGroupsStateHandle implements StateObject {
+public class KeyGroupsStateHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = -8070326169926626355L;
 
@@ -104,14 +105,6 @@ public class KeyGroupsStateHandle implements StateObject {
 		return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
 	}
 
-	/**
-	 *
-	 * @return the inner stream state handle to the actual key-group states
-	 */
-	public StreamStateHandle getStateHandle() {
-		return stateHandle;
-	}
-
 	@Override
 	public void discardState() throws Exception {
 		stateHandle.discardState();
@@ -123,6 +116,15 @@ public class KeyGroupsStateHandle implements StateObject {
 	}
 
 	@Override
+	public FSDataInputStream openInputStream() throws IOException {
+		return stateHandle.openInputStream();
+	}
+
+	public StreamStateHandle getDelegateStateHandle() {
+		return stateHandle;
+	}
+
+	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
 			return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a766373..040677b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -259,7 +259,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			try {
 
-				fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream();
+				fsDataInputStream = keyGroupsHandle.openInputStream();
 				cancelStreamRegistry.registerClosable(fsDataInputStream);
 
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);

http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5fb0e6f..972f0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2474,7 +2474,7 @@ public class CheckpointCoordinatorTest {
 
 		assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups);
 
-		try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.getStateHandle().openInputStream()) {
+		try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) {
 			for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) {
 				long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
 				inputStream.seek(offset);
@@ -2483,9 +2483,8 @@ public class CheckpointCoordinatorTest {
 				for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
 					if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
 						long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
-						try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.
-								getStateHandle().openInputStream()) {
-
+						try (FSDataInputStream actualInputStream =
+								     oneActualKeyGroupStateHandle.openInputStream()) {
 							actualInputStream.seek(actualOffset);
 
 							int actualGroupState = InstantiationUtil.