You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2016/10/06 15:04:41 UTC
[2/4] flink git commit: [hotfix] Make KeyGroupsStateHandle implement
StreamStateHandle
[hotfix] Make KeyGroupsStateHandle implement StreamStateHandle
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fee14309
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fee14309
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fee14309
Branch: refs/heads/master
Commit: fee143099a5ac7220622617c79abae60786555e4
Parents: 98710ea
Author: Stefan Richter <s....@data-artisans.com>
Authored: Sun Oct 2 14:15:48 2016 +0200
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Thu Oct 6 17:04:09 2016 +0200
----------------------------------------------------------------------
.../state/RocksDBKeyedStateBackend.java | 2 +-
.../savepoint/SavepointV1Serializer.java | 2 +-
.../runtime/state/KeyGroupsStateHandle.java | 20 +++++++++++---------
.../state/heap/HeapKeyedStateBackend.java | 2 +-
.../checkpoint/CheckpointCoordinatorTest.java | 7 +++----
5 files changed, 17 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 126ebd2..7ab35c4 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -624,7 +624,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
private void restoreKeyGroupsInStateHandle()
throws IOException, RocksDBException, ClassNotFoundException {
try {
- currentStateHandleInStream = currentKeyGroupsStateHandle.getStateHandle().openInputStream();
+ currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
restoreKVStateMetaData();
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
index 536062a..f120e1d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Serializer.java
@@ -205,7 +205,7 @@ class SavepointV1Serializer implements SavepointSerializer<SavepointV1> {
for (int keyGroup : stateHandle.keyGroups()) {
dos.writeLong(stateHandle.getOffsetForKeyGroup(keyGroup));
}
- serializeStreamStateHandle(stateHandle.getStateHandle(), dos);
+ serializeStreamStateHandle(stateHandle.getDelegateStateHandle(), dos);
} else {
dos.writeByte(NULL_HANDLE);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index ea12808..1d277b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -28,7 +29,7 @@ import java.io.IOException;
* consists of a range of key group snapshots. A key group is subset of the available
* key space. The key groups are identified by their key group indices.
*/
-public class KeyGroupsStateHandle implements StateObject {
+public class KeyGroupsStateHandle implements StreamStateHandle {
private static final long serialVersionUID = -8070326169926626355L;
@@ -104,14 +105,6 @@ public class KeyGroupsStateHandle implements StateObject {
return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
}
- /**
- *
- * @return the inner stream state handle to the actual key-group states
- */
- public StreamStateHandle getStateHandle() {
- return stateHandle;
- }
-
@Override
public void discardState() throws Exception {
stateHandle.discardState();
@@ -123,6 +116,15 @@ public class KeyGroupsStateHandle implements StateObject {
}
@Override
+ public FSDataInputStream openInputStream() throws IOException {
+ return stateHandle.openInputStream();
+ }
+
+ public StreamStateHandle getDelegateStateHandle() {
+ return stateHandle;
+ }
+
+ @Override
public boolean equals(Object o) {
if (this == o) {
return true;
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index a766373..040677b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -259,7 +259,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
try {
- fsDataInputStream = keyGroupsHandle.getStateHandle().openInputStream();
+ fsDataInputStream = keyGroupsHandle.openInputStream();
cancelStreamRegistry.registerClosable(fsDataInputStream);
DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
http://git-wip-us.apache.org/repos/asf/flink/blob/fee14309/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 5fb0e6f..972f0ea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2474,7 +2474,7 @@ public class CheckpointCoordinatorTest {
assertEquals(expectedTotalKeyGroups, actualTotalKeyGroups);
- try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.getStateHandle().openInputStream()) {
+ try (FSDataInputStream inputStream = expectedHeadOpKeyGroupStateHandle.openInputStream()) {
for (int groupId : expectedHeadOpKeyGroupStateHandle.keyGroups()) {
long offset = expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
inputStream.seek(offset);
@@ -2483,9 +2483,8 @@ public class CheckpointCoordinatorTest {
for (KeyGroupsStateHandle oneActualKeyGroupStateHandle : actualPartitionedKeyGroupState) {
if (oneActualKeyGroupStateHandle.containsKeyGroup(groupId)) {
long actualOffset = oneActualKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
- try (FSDataInputStream actualInputStream = oneActualKeyGroupStateHandle.
- getStateHandle().openInputStream()) {
-
+ try (FSDataInputStream actualInputStream =
+ oneActualKeyGroupStateHandle.openInputStream()) {
actualInputStream.seek(actualOffset);
int actualGroupState = InstantiationUtil.