You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 11:03:24 UTC

[6/6] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().

[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25

Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:21:18 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/query/KvStateRegistry.java    | 23 ++++++--------------
 1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
 			new ConcurrentHashMap<>();
 
 	/** Registry listener to be notified on registration/unregistration. */
-	private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+	private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
 
 	/**
 	 * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
 	 * @throws IllegalStateException If there is a registered listener
 	 */
 	public void registerListener(KvStateRegistryListener listener) {
-		if (!this.listener.compareAndSet(null, listener)) {
+		if (!listenerRef.compareAndSet(null, listener)) {
 			throw new IllegalStateException("Listener already registered.");
 		}
 	}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
 	 * Unregisters the listener with the registry.
 	 */
 	public void unregisterListener() {
-		listener.set(null);
+		listenerRef.set(null);
 	}
 
 	/**
-	 * Registers the KvState instance identified by the given 4-tuple of JobID,
-	 * JobVertexID, key group index, and registration name.
-	 *
-	 * @param kvStateId KvStateID to identify the KvState instance
-	 * @param kvState   KvState instance to register
-	 * @throws IllegalStateException If there is a KvState instance registered
-	 *                               with the same ID.
-	 */
-
-	/**
 	 * Registers the KvState instance and returns the assigned ID.
 	 *
 	 * @param jobId            JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
 		KvStateID kvStateId = new KvStateID();
 
 		if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateRegistered(
 						jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
 			return kvStateId;
 		} else {
-			throw new IllegalStateException(kvStateId + " is already registered.");
+			throw new IllegalStateException(
+					"State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
 		}
 	}
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
 			KvStateID kvStateId) {
 
 		if (registeredKvStates.remove(kvStateId) != null) {
-			KvStateRegistryListener listener = this.listener.get();
+			final KvStateRegistryListener listener = listenerRef.get();
 			if (listener != null) {
 				listener.notifyKvStateUnregistered(
 						jobId,