You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/15 17:44:29 UTC

flink git commit: [FLINK-6018] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()

Repository: flink
Updated Branches:
  refs/heads/master 609b2577a -> 264f6df8e


[FLINK-6018] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/264f6df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/264f6df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/264f6df8

Branch: refs/heads/master
Commit: 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c
Parents: 609b257
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Tue Mar 14 22:26:53 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Mar 15 18:43:58 2017 +0100

----------------------------------------------------------------------
 .../streaming/state/RocksDBKeyedStateBackend.java       |  6 ++++--
 .../contrib/streaming/state/RocksDBStateBackend.java    |  3 ++-
 .../flink/runtime/state/AbstractKeyedStateBackend.java  | 11 ++++++-----
 .../flink/runtime/state/filesystem/FsStateBackend.java  |  3 ++-
 .../flink/runtime/state/heap/HeapKeyedStateBackend.java |  6 ++++--
 .../flink/runtime/state/memory/MemoryStateBackend.java  |  3 ++-
 .../netty/message/KvStateRequestSerializerTest.java     |  7 +++++--
 .../runtime/state/heap/HeapAggregatingStateTest.java    |  3 ++-
 .../flink/runtime/state/heap/HeapListStateTest.java     |  3 ++-
 .../flink/runtime/state/heap/HeapReducingStateTest.java |  3 ++-
 .../test/query/KVStateRequestSerializerRocksDBTest.java | 12 ++++++++----
 11 files changed, 39 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index eb926c0..aaccc2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.contrib.streaming.state;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -144,10 +145,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange
+			KeyGroupRange keyGroupRange,
+			ExecutionConfig executionConfig
 	) throws IOException {
 
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
 		this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions);
 		this.dbOptions = Preconditions.checkNotNull(dbOptions);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index dd0e2f7..16b0869 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -259,7 +259,8 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 				kvStateRegistry,
 				keySerializer,
 				numberOfKeyGroups,
-				keyGroupRange);
+				keyGroupRange,
+				env.getExecutionConfig());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 14f897f..aba00f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -94,12 +94,15 @@ public abstract class AbstractKeyedStateBackend<K>
 
 	protected final ClassLoader userCodeClassLoader;
 
+	private final ExecutionConfig executionConfig;
+
 	public AbstractKeyedStateBackend(
 			TaskKvStateRegistry kvStateRegistry,
 			TypeSerializer<K> keySerializer,
 			ClassLoader userCodeClassLoader,
 			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange) {
+			KeyGroupRange keyGroupRange,
+			ExecutionConfig executionConfig) {
 
 		this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -108,6 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.cancelStreamRegistry = new CloseableRegistry();
 		this.keyValueStatesByName = new HashMap<>();
+		this.executionConfig = executionConfig;
 	}
 
 	/**
@@ -349,10 +353,7 @@ public abstract class AbstractKeyedStateBackend<K>
 
 		checkNotNull(namespace, "Namespace");
 
-		// TODO: This is wrong, it should throw an exception that the initialization has not properly happened
-		if (!stateDescriptor.isSerializerInitialized()) {
-			stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
+		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
 
 		if (lastName != null && lastName.equals(stateDescriptor.getName())) {
 			lastState.setCurrentNamespace(namespace);

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5e8a15d..2e9198f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -208,7 +208,8 @@ public class FsStateBackend extends AbstractStateBackend {
 				keySerializer,
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
-				keyGroupRange);
+				keyGroupRange,
+				env.getExecutionConfig());
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 4a5455a..a4a08c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.heap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -100,9 +101,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			TypeSerializer<K> keySerializer,
 			ClassLoader userCodeClassLoader,
 			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange) {
+			KeyGroupRange keyGroupRange,
+			ExecutionConfig executionConfig) {
 
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
 
 		LOG.info("Initializing heap keyed state backend with stream factory.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 6e6b034..da01c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -97,6 +97,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 				keySerializer,
 				env.getUserClassLoader(),
 				numberOfKeyGroups,
-				keyGroupRange);
+				keyGroupRange,
+				env.getExecutionConfig());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index dd61a3f..93094a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.UnpooledByteBufAllocator;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -331,7 +332,8 @@ public class KvStateRequestSerializerTest {
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
 				ClassLoader.getSystemClassLoader(),
-				1, new KeyGroupRange(0, 0)
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -430,7 +432,8 @@ public class KvStateRequestSerializerTest {
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
 				ClassLoader.getSystemClassLoader(),
-				1, new KeyGroupRange(0, 0)
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index a7ae5be..735b5f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -236,7 +236,8 @@ public class HeapAggregatingStateTest {
 				StringSerializer.INSTANCE,
 				HeapAggregatingStateTest.class.getClassLoader(),
 				16,
-				new KeyGroupRange(0, 15));
+				new KeyGroupRange(0, 15),
+				new ExecutionConfig());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 746db28..c36a48b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -232,7 +232,8 @@ public class HeapListStateTest {
 				StringSerializer.INSTANCE,
 				HeapListStateTest.class.getClassLoader(),
 				16,
-				new KeyGroupRange(0, 15));
+				new KeyGroupRange(0, 15),
+				new ExecutionConfig());
 	}
 	
 	private static <T> void validateResult(Iterable<T> values, Set<T> expected) {

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index e0929f1..63eec04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -236,7 +236,8 @@ public class HeapReducingStateTest {
 				StringSerializer.INSTANCE,
 				HeapReducingStateTest.class.getClassLoader(),
 				16,
-				new KeyGroupRange(0, 15));
+				new KeyGroupRange(0, 15),
+				new ExecutionConfig());
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 6e2fd62..3c86f90 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.test.query;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -74,12 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
 				final TaskKvStateRegistry kvStateRegistry,
 				final TypeSerializer<K> keySerializer,
 				final int numberOfKeyGroups,
-				final KeyGroupRange keyGroupRange) throws Exception {
+				final KeyGroupRange keyGroupRange,
+				final ExecutionConfig executionConfig) throws Exception {
 
 			super(jobId, operatorIdentifier, userCodeClassLoader,
 				instanceBasePath,
 				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
-				numberOfKeyGroups, keyGroupRange);
+				numberOfKeyGroups, keyGroupRange, executionConfig);
 		}
 
 		@Override
@@ -115,7 +117,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 				columnFamilyOptions,
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
-				1, new KeyGroupRange(0, 0)
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);
 
@@ -150,7 +153,8 @@ public final class KVStateRequestSerializerRocksDBTest {
 				columnFamilyOptions,
 				mock(TaskKvStateRegistry.class),
 				LongSerializer.INSTANCE,
-				1, new KeyGroupRange(0, 0)
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
 			);
 		longHeapKeyedStateBackend.setCurrentKey(key);