You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/25 21:24:37 UTC
[2/3] flink git commit: [FLINK-2888] [streaming] State backends
return copies of the default values
[FLINK-2888] [streaming] State backends return copies of the default values
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e4cb0ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e4cb0ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e4cb0ae
Branch: refs/heads/release-0.10
Commit: 8e4cb0ae0bdc7f8c60542edabad57e4fa2f0c61e
Parents: c2811ce
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Oct 22 11:24:12 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Oct 25 19:12:28 2015 +0100
----------------------------------------------------------------------
.../runtime/state/AbstractHeapKvState.java | 3 +-
.../runtime/state/FileStateBackendTest.java | 40 ++++++++++++++++----
.../runtime/state/MemoryStateBackendTest.java | 31 ++++++++++++---
3 files changed, 61 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
index 12250b9..23703b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractHeapKvState.java
@@ -88,7 +88,8 @@ public abstract class AbstractHeapKvState<K, V, Backend extends StateBackend<Bac
@Override
public V value() {
V value = state.get(currentKey);
- return value != null ? value : defaultValue;
+ return value != null ? value :
+ (defaultValue == null ? null : valueSerializer.copy(defaultValue));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 481fb98..7182a36 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -23,20 +23,16 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
-import org.apache.flink.util.OperatingSystem;
import org.junit.Test;
@@ -382,6 +378,36 @@ public class FileStateBackendTest {
deleteDirectorySilently(tempDir);
}
}
+
+ @Test
+ public void testCopyDefaultValue() {
+ File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+ try {
+ FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(localFileUri(tempDir)));
+ backend.initializeForJob(new JobID());
+
+ KvState<Integer, IntValue, FsStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+ kv.setCurrentKey(1);
+ IntValue default1 = kv.value();
+
+ kv.setCurrentKey(2);
+ IntValue default2 = kv.value();
+
+ assertNotNull(default1);
+ assertNotNull(default2);
+ assertEquals(default1, default2);
+ assertFalse(default1 == default2);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ finally {
+ deleteDirectorySilently(tempDir);
+ }
+ }
// ------------------------------------------------------------------------
// Utilities
@@ -411,7 +437,7 @@ public class FileStateBackendTest {
}
private static String localFileUri(File path) {
- return (OperatingSystem.isWindows() ? "file:/" : "file://") + path.getAbsolutePath();
+ return path.toURI().toString();
}
private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e4cb0ae/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 5f95b33..87a050b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -21,14 +21,11 @@ package org.apache.flink.runtime.state;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.ValueSerializer;
-import org.apache.flink.runtime.state.KvState;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.types.IntValue;
import org.apache.flink.types.StringValue;
import org.junit.Test;
@@ -279,4 +276,28 @@ public class MemoryStateBackendTest {
fail(e.getMessage());
}
}
+
+ @Test
+ public void testCopyDefaultValue() {
+ try {
+ MemoryStateBackend backend = new MemoryStateBackend();
+ KvState<Integer, IntValue, MemoryStateBackend> kv =
+ backend.createKvState(IntSerializer.INSTANCE, IntValueSerializer.INSTANCE, new IntValue(-1));
+
+ kv.setCurrentKey(1);
+ IntValue default1 = kv.value();
+
+ kv.setCurrentKey(2);
+ IntValue default2 = kv.value();
+
+ assertNotNull(default1);
+ assertNotNull(default2);
+ assertEquals(default1, default2);
+ assertFalse(default1 == default2);
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
}