You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/25 21:24:37 UTC

[2/3] flink git commit: [FLINK-2888] [streaming] State backends return copies of the default values

[FLINK-2888] [streaming] State backends return copies of the default values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4cb0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4cb0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4cb0ae

Branch: refs/heads/release-0.10
Commit: 8e4cb0ae0bdc7f8c60542edabad57e4fa2f0c61e
Parents: c2811ce
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 11:24:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100

----------------------------------------------------------------------
 .../runtime/state/AbstractHeapKvState.java      |  3 +-
 .../runtime/state/FileStateBackendTest.java     | 40 ++++++++++++++++----
 .../runtime/state/MemoryStateBackendTest.java   | 31 ++++++++++++---
 3 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
index 12250b9..23703b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Bac
 	@Override
 	public V value() {
 		V value = state.get(currentKey);
-		return value != null ? value : defaultValue;
+		return value != null ? value : 
+				(defaultValue == null ? null : valueSerializer.copy(defaultValue));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 481fb98..7182a36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
 
 import org.junit.Test;
 
@@ -382,6 +378,36 @@ public class FileStateBackendTest {
 			deleteDirectorySilently(tempDir);
 		}
 	}
+
+	@Test
+	public void testCopyDefaultValue() {
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+			backend.initializeForJob(new JobID());
+			
+			KvState<Integer, IntValue, FsStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+			kv.setCurrentKey(1);
+			IntValue default1 = kv.value();
+
+			kv.setCurrentKey(2);
+			IntValue default2 = kv.value();
+
+			assertNotNull(default1);
+			assertNotNull(default2);
+			assertEquals(default1, default2);
+			assertFalse(default1 == default2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 	//  Utilities
@@ -411,7 +437,7 @@ public class FileStateBackendTest {
 	}
 	
 	private static String localFileUri(File path) {
-		return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+		return path.toURI().toString();
 	}
 	
 	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 5f95b33..87a050b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -21,14 +21,11 @@ package org.apache.flink.runtime.state;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.FloatSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.IntValue;
 import org.apache.flink.types.StringValue;
 import org.junit.Test;
 
@@ -279,4 +276,28 @@ public class MemoryStateBackendTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testCopyDefaultValue() {
+		try {
+			MemoryStateBackend backend = new MemoryStateBackend();
+			KvState<Integer, IntValue, MemoryStateBackend> kv =
+					backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+			kv.setCurrentKey(1);
+			IntValue default1 = kv.value();
+
+			kv.setCurrentKey(2);
+			IntValue default2 = kv.value();
+			
+			assertNotNull(default1);
+			assertNotNull(default2);
+			assertEquals(default1, default2);
+			assertFalse(default1 == default2);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
 }