You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/17 11:03:24 UTC
[6/6] flink git commit: [FLINK-8057][QS] Change error message in
KvStateRegistry.registerKvState().
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25
Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas <kk...@gmail.com>
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Fri Nov 17 11:21:18 2017 +0100
----------------------------------------------------------------------
.../flink/runtime/query/KvStateRegistry.java | 23 ++++++--------------
1 file changed, 7 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
new ConcurrentHashMap<>();
/** Registry listener to be notified on registration/unregistration. */
- private final AtomicReference<KvStateRegistryListener> listener = new AtomicReference<>();
+ private final AtomicReference<KvStateRegistryListener> listenerRef = new AtomicReference<>();
/**
* Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
* @throws IllegalStateException If there is a registered listener
*/
public void registerListener(KvStateRegistryListener listener) {
- if (!this.listener.compareAndSet(null, listener)) {
+ if (!listenerRef.compareAndSet(null, listener)) {
throw new IllegalStateException("Listener already registered.");
}
}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
* Unregisters the listener with the registry.
*/
public void unregisterListener() {
- listener.set(null);
+ listenerRef.set(null);
}
/**
- * Registers the KvState instance identified by the given 4-tuple of JobID,
- * JobVertexID, key group index, and registration name.
- *
- * @param kvStateId KvStateID to identify the KvState instance
- * @param kvState KvState instance to register
- * @throws IllegalStateException If there is a KvState instance registered
- * with the same ID.
- */
-
- /**
* Registers the KvState instance and returns the assigned ID.
*
* @param jobId JobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
KvStateID kvStateId = new KvStateID();
if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateRegistered(
jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
return kvStateId;
} else {
- throw new IllegalStateException(kvStateId + " is already registered.");
+ throw new IllegalStateException(
+ "State \"" + registrationName + " \"(id=" + kvStateId + ") appears registered although it should not.");
}
}
@@ -127,7 +118,7 @@ public class KvStateRegistry {
KvStateID kvStateId) {
if (registeredKvStates.remove(kvStateId) != null) {
- KvStateRegistryListener listener = this.listener.get();
+ final KvStateRegistryListener listener = listenerRef.get();
if (listener != null) {
listener.notifyKvStateUnregistered(
jobId,