You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2016/08/24 07:08:26 UTC

flink git commit: [FLINK-4441] Make RocksDB backend return null on empty state + add test for all backends

Repository: flink
Updated Branches:
  refs/heads/master 863dc1805 -> c09ff0359


[FLINK-4441] Make RocksDB backend return null on empty state + add test for all backends

Closes #2399


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c09ff035
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c09ff035
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c09ff035

Branch: refs/heads/master
Commit: c09ff03596fc91b6de6c358f18ead766c4ea552a
Parents: 863dc18
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Aug 22 07:38:18 2016 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Wed Aug 24 09:07:24 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    |  4 ++--
 .../runtime/state/StateBackendTestBase.java     | 23 ++++++++++++++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c09ff035/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 9496d12..2c9a5d2 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -386,7 +386,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	@Override
 	public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
 		if (keyValueStatesByName == null || keyValueStatesByName.size() == 0) {
-			return new HashMap<>();
+			return null;
 		}
 
 		if (fullyAsyncBackup) {
@@ -482,7 +482,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 
 	@Override
 	public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
-		if (keyValueStateSnapshots.size() == 0) {
+		if (keyValueStateSnapshots == null) {
 			return;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09ff035/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index d59e17b..7b00b27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1048,6 +1048,29 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 
 	}
+	
+	@Test
+	public void testEmptyStateCheckpointing() {
+		try {
+			DummyEnvironment env = new DummyEnvironment("test", 1, 0);
+			backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+
+			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshot = backend
+					.snapshotPartitionedState(682375462379L, 1);
+			
+			assertNull(snapshot);
+			backend.dispose();
+
+			// Make sure we can restore from empty state
+			backend.initializeForJob(env, "test_op", IntSerializer.INSTANCE);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot);
+			backend.dispose();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 
 	private static class AppendingReduce implements ReduceFunction<String> {
 		@Override