You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/09/06 08:24:01 UTC

[flink] branch master updated: [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2c2aafd  [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
2c2aafd is described below

commit 2c2aafddc1861c0a6b52b436352d5bd84e3ccd37
Author: Aitozi <10...@qq.com>
AuthorDate: Thu Sep 6 16:23:56 2018 +0800

    [FLINK-10254][backend]Fix inappropriate checkNotNull in stateBackend
---
 .../org/apache/flink/runtime/state/AbstractKeyedStateBackend.java    | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 1c2d2a3..19de210 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -100,9 +100,12 @@ public abstract class AbstractKeyedStateBackend<K> implements
 		ExecutionConfig executionConfig,
 		TtlTimeProvider ttlTimeProvider) {
 
+		Preconditions.checkArgument(numberOfKeyGroups >= 1, "NumberOfKeyGroups must be a positive number");
+		Preconditions.checkArgument(numberOfKeyGroups >= keyGroupRange.getNumberOfKeyGroups(), "The total number of key groups must be at least the number in the key group range assigned to this backend");
+
 		this.kvStateRegistry = kvStateRegistry;
 		this.keySerializer = Preconditions.checkNotNull(keySerializer);
-		this.numberOfKeyGroups = Preconditions.checkNotNull(numberOfKeyGroups);
+		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
 		this.cancelStreamRegistry = new CloseableRegistry();