You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/08/24 07:08:26 UTC
flink git commit: [FLINK-4441] Make RocksDB backend return null on
empty state + add test for all backends
Repository: flink
Updated Branches:
refs/heads/master 863dc1805 -> c09ff0359
[FLINK-4441] Make RocksDB backend return null on empty state + add test for all backends
Closes #2399
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c09ff035
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c09ff035
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c09ff035
Branch: refs/heads/master
Commit: c09ff03596fc91b6de6c358f18ead766c4ea552a
Parents: 863dc18
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Aug 22 07:38:18 2016 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Aug 24 09:07:24 2016 +0200
----------------------------------------------------------------------
.../streaming/state/RocksDBStateBackend.java | 4 ++--
.../runtime/state/StateBackendTestBase.java | 23 ++++++++++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c09ff035/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9496d12..2c9a5d2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -386,7 +386,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Override
public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
- return new HashMap<>();
+ return null;
}
if (fullyAsyncBackup) {
@@ -482,7 +482,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Override
public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
- if (keyValueStateSnapshots.size() == 0) {
+ if (keyValueStateSnapshots == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c09ff035/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index d59e17b..7b00b27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1048,6 +1048,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
}
+
+ @Test
+ public void testEmptyStateCheckpointing() {
+ try {
+ DummyEnvironment env = new DummyEnvironment("test", 1, 0);
+ backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+ HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
+ .snapshotPartitionedState(682375462379L, 1);
+
+ assertNull(snapshot);
+ backend.dispose();
+
+ // Make sure we can restore from empty state
+ backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+ backend.injectKeyValueStateSnapshots((HashMap) snapshot);
+ backend.dispose();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
private static class AppendingReduce implements ReduceFunction<String> {
@Override