You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/15 17:44:29 UTC
flink git commit: [FLINK-6018] Properly initialise StateDescriptor in
AbstractStateBackend.getPartitionedState()
Repository: flink
Updated Branches:
refs/heads/master 609b2577a -> 264f6df8e
[FLINK-6018] Properly initialise StateDescriptor in AbstractStateBackend.getPartitionedState()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/264f6df8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/264f6df8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/264f6df8
Branch: refs/heads/master
Commit: 264f6df8e0c0fb2f9dfb0cd9beab9d380dc8e00c
Parents: 609b257
Author: \u91d1\u7af9 <ji...@alibaba-inc.com>
Authored: Tue Mar 14 22:26:53 2017 +0800
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Mar 15 18:43:58 2017 +0100
----------------------------------------------------------------------
.../streaming/state/RocksDBKeyedStateBackend.java | 6 ++++--
.../contrib/streaming/state/RocksDBStateBackend.java | 3 ++-
.../flink/runtime/state/AbstractKeyedStateBackend.java | 11 ++++++-----
.../flink/runtime/state/filesystem/FsStateBackend.java | 3 ++-
.../flink/runtime/state/heap/HeapKeyedStateBackend.java | 6 ++++--
.../flink/runtime/state/memory/MemoryStateBackend.java | 3 ++-
.../netty/message/KvStateRequestSerializerTest.java | 7 +++++--
.../runtime/state/heap/HeapAggregatingStateTest.java | 3 ++-
.../flink/runtime/state/heap/HeapListStateTest.java | 3 ++-
.../flink/runtime/state/heap/HeapReducingStateTest.java | 3 ++-
.../test/query/KVStateRequestSerializerRocksDBTest.java | 12 ++++++++----
11 files changed, 39 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index eb926c0..aaccc2f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -17,6 +17,7 @@
package org.apache.flink.contrib.streaming.state;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -144,10 +145,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
- KeyGroupRange keyGroupRange
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig
) throws IOException {
- super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+ super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions);
this.dbOptions = Preconditions.checkNotNull(dbOptions);
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index dd0e2f7..16b0869 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -259,7 +259,8 @@ public class RocksDBStateBackend extends AbstractStateBackend {
kvStateRegistry,
keySerializer,
numberOfKeyGroups,
- keyGroupRange);
+ keyGroupRange,
+ env.getExecutionConfig());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 14f897f..aba00f3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -94,12 +94,15 @@ public abstract class AbstractKeyedStateBackend<K>
protected final ClassLoader userCodeClassLoader;
+ private final ExecutionConfig executionConfig;
+
public AbstractKeyedStateBackend(
TaskKvStateRegistry kvStateRegistry,
TypeSerializer<K> keySerializer,
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
- KeyGroupRange keyGroupRange) {
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig) {
this.kvStateRegistry = kvStateRegistry;//Preconditions.checkNotNull(kvStateRegistry);
this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -108,6 +111,7 @@ public abstract class AbstractKeyedStateBackend<K>
this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
this.cancelStreamRegistry = new CloseableRegistry();
this.keyValueStatesByName = new HashMap<>();
+ this.executionConfig = executionConfig;
}
/**
@@ -349,10 +353,7 @@ public abstract class AbstractKeyedStateBackend<K>
checkNotNull(namespace, "Namespace");
- // TODO: This is wrong, it should throw an exception that the initialization has not properly happened
- if (!stateDescriptor.isSerializerInitialized()) {
- stateDescriptor.initializeSerializerUnlessSet(new ExecutionConfig());
- }
+ stateDescriptor.initializeSerializerUnlessSet(executionConfig);
if (lastName != null && lastName.equals(stateDescriptor.getName())) {
lastState.setCurrentNamespace(namespace);
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 5e8a15d..2e9198f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -208,7 +208,8 @@ public class FsStateBackend extends AbstractStateBackend {
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
- keyGroupRange);
+ keyGroupRange,
+ env.getExecutionConfig());
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 4a5455a..a4a08c1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.heap;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.FoldingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
@@ -100,9 +101,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
TypeSerializer<K> keySerializer,
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
- KeyGroupRange keyGroupRange) {
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig) {
- super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
+ super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
LOG.info("Initializing heap keyed state backend with stream factory.");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 6e6b034..da01c09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -97,6 +97,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
keySerializer,
env.getUserClassLoader(),
numberOfKeyGroups,
- keyGroupRange);
+ keyGroupRange,
+ env.getExecutionConfig());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
index dd61a3f..93094a4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java
@@ -22,6 +22,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -331,7 +332,8 @@ public class KvStateRequestSerializerTest {
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
- 1, new KeyGroupRange(0, 0)
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -430,7 +432,8 @@ public class KvStateRequestSerializerTest {
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
ClassLoader.getSystemClassLoader(),
- 1, new KeyGroupRange(0, 0)
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
index a7ae5be..735b5f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapAggregatingStateTest.java
@@ -236,7 +236,8 @@ public class HeapAggregatingStateTest {
StringSerializer.INSTANCE,
HeapAggregatingStateTest.class.getClassLoader(),
16,
- new KeyGroupRange(0, 15));
+ new KeyGroupRange(0, 15),
+ new ExecutionConfig());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
index 746db28..c36a48b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapListStateTest.java
@@ -232,7 +232,8 @@ public class HeapListStateTest {
StringSerializer.INSTANCE,
HeapListStateTest.class.getClassLoader(),
16,
- new KeyGroupRange(0, 15));
+ new KeyGroupRange(0, 15),
+ new ExecutionConfig());
}
private static <T> void validateResult(Iterable<T> values, Set<T> expected) {
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
index e0929f1..63eec04 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapReducingStateTest.java
@@ -236,7 +236,8 @@ public class HeapReducingStateTest {
StringSerializer.INSTANCE,
HeapReducingStateTest.class.getClassLoader(),
16,
- new KeyGroupRange(0, 15));
+ new KeyGroupRange(0, 15),
+ new ExecutionConfig());
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/264f6df8/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
index 6e2fd62..3c86f90 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.test.query;
+import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -74,12 +75,13 @@ public final class KVStateRequestSerializerRocksDBTest {
final TaskKvStateRegistry kvStateRegistry,
final TypeSerializer<K> keySerializer,
final int numberOfKeyGroups,
- final KeyGroupRange keyGroupRange) throws Exception {
+ final KeyGroupRange keyGroupRange,
+ final ExecutionConfig executionConfig) throws Exception {
super(jobId, operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
- numberOfKeyGroups, keyGroupRange);
+ numberOfKeyGroups, keyGroupRange, executionConfig);
}
@Override
@@ -115,7 +117,8 @@ public final class KVStateRequestSerializerRocksDBTest {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0)
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);
@@ -150,7 +153,8 @@ public final class KVStateRequestSerializerRocksDBTest {
columnFamilyOptions,
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0)
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
);
longHeapKeyedStateBackend.setCurrentKey(key);