You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:14 UTC

[14/14] flink git commit: [FLINK-7770][QS] Hide the queryable state behind a proxy.

[FLINK-7770][QS] Hide the queryable state behind a proxy.

Previously the QueryableStateClient could connect to the JM
and the TMs directly to fetch the required state. Now, there
is a proxy running on each TM and the remote client connects
to one of these proxies in order to get its state. The proxy
receives the request from the client, performs all necessary
message exchanges within the Flink cluster, receives the state
and forwards it back to the client.

This architecture allows for more security features to be
integrated in the future, as the proxy is running in the
cluster, it exposes less information about the cluster to
the outside world, and is more suitable for containerized
environments.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f48f5340
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f48f5340
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f48f5340

Branch: refs/heads/master
Commit: f48f5340a871ac58a649766e434218b67e9322ac
Parents: 29a6e99
Author: kkloudas <kk...@gmail.com>
Authored: Thu Oct 5 15:16:23 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Wed Oct 11 15:33:32 2017 +0200

----------------------------------------------------------------------
 .../configuration/QueryableStateOptions.java    |   2 +-
 .../java/org/apache/flink/util/AbstractID.java  |   4 +-
 .../flink/queryablestate/UnknownJobManager.java |  35 -
 .../UnknownJobManagerException.java             |  36 +
 .../queryablestate/UnknownKeyOrNamespace.java   |  31 -
 .../UnknownKeyOrNamespaceException.java         |  39 +
 .../flink/queryablestate/UnknownKvStateID.java  |  35 -
 .../UnknownKvStateIdException.java              |  42 +
 .../UnknownKvStateKeyGroupLocation.java         |  31 -
 ...UnknownKvStateKeyGroupLocationException.java |  41 +
 .../AkkaKvStateLocationLookupService.java       | 325 --------
 .../queryablestate/client/KvStateClient.java    | 583 --------------
 .../client/KvStateClientHandler.java            | 107 ---
 .../client/KvStateClientHandlerCallback.java    |  54 --
 .../client/KvStateLocationLookupService.java    |  51 --
 .../client/QueryableStateClient.java            | 479 +++--------
 .../client/proxy/KvStateClientProxyHandler.java | 225 ++++++
 .../client/proxy/KvStateClientProxyImpl.java    | 127 +++
 .../messages/KvStateInternalRequest.java        |  93 +++
 .../queryablestate/messages/KvStateRequest.java | 142 ++--
 .../messages/KvStateRequestFailure.java         |  68 --
 .../messages/KvStateRequestResult.java          |  74 --
 .../messages/KvStateResponse.java               |  75 ++
 .../network/AbstractServerBase.java             | 241 ++++++
 .../network/AbstractServerHandler.java          | 306 ++++++++
 .../network/BadRequestException.java            |  35 +
 .../queryablestate/network/ChunkedByteBuf.java  | 100 +++
 .../flink/queryablestate/network/Client.java    | 537 +++++++++++++
 .../queryablestate/network/ClientHandler.java   | 122 +++
 .../network/ClientHandlerCallback.java          |  56 ++
 .../network/messages/MessageBody.java           |  38 +
 .../network/messages/MessageDeserializer.java   |  39 +
 .../network/messages/MessageSerializer.java     | 228 +++---
 .../network/messages/MessageType.java           |   7 +-
 .../network/messages/RequestFailure.java        |  71 ++
 .../queryablestate/server/ChunkedByteBuf.java   |  98 ---
 .../server/KvStateServerHandler.java            | 279 +------
 .../server/KvStateServerImpl.java               | 222 ++----
 .../itcases/AbstractQueryableStateITCase.java   | 534 +++++++------
 .../itcases/HAAbstractQueryableStateITCase.java |   8 +-
 .../NonHAAbstractQueryableStateITCase.java      |  10 +-
 .../AkkaKvStateLocationLookupServiceTest.java   | 399 ----------
 .../queryablestate/network/ClientTest.java      | 784 +++++++++++++++++++
 .../network/KvStateClientHandlerTest.java       |  24 +-
 .../network/KvStateClientTest.java              | 752 ------------------
 .../network/KvStateRequestSerializerTest.java   | 214 -----
 .../network/KvStateServerHandlerTest.java       | 286 ++++---
 .../network/KvStateServerTest.java              |  30 +-
 .../network/MessageSerializerTest.java          | 220 ++++++
 .../network/QueryableStateClientTest.java       | 458 -----------
 .../flink/runtime/concurrent/FutureUtils.java   |  16 +
 .../runtime/io/network/NetworkEnvironment.java  |  44 +-
 .../flink/runtime/query/KvStateClientProxy.java |  65 ++
 .../flink/runtime/query/KvStateLocation.java    |   8 +-
 .../flink/runtime/query/KvStateServer.java      |  19 +-
 .../runtime/query/QueryableStateUtils.java      |  54 +-
 .../query/netty/KvStateRequestStats.java        |   6 +-
 .../QueryableStateConfiguration.java            |   1 +
 .../taskexecutor/TaskManagerServices.java       |  16 +-
 .../TaskManagerServicesConfiguration.java       |   4 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  28 +-
 .../io/network/NetworkEnvironmentTest.java      |   1 +
 ...askManagerComponentsStartupShutdownTest.java |   1 +
 63 files changed, 4317 insertions(+), 4743 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
index 8b17cfb..df850e9 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/QueryableStateOptions.java
@@ -40,7 +40,7 @@ public class QueryableStateOptions {
 	/** Port to bind KvState server to (0 => pick random available port). */
 	public static final ConfigOption<Integer> SERVER_PORT =
 			key("query.server.port")
-			.defaultValue(0);
+			.defaultValue(9069);
 
 	/** Number of network (event loop) threads for the KvState server (0 => #slots). */
 	public static final ConfigOption<Integer> SERVER_NETWORK_THREADS =

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
index 397bb71..12d634d 100644
--- a/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
+++ b/flink-core/src/main/java/org/apache/flink/util/AbstractID.java
@@ -150,7 +150,7 @@ public class AbstractID implements Comparable<AbstractID>, java.io.Serializable
 				((int)  this.upperPart) ^
 				((int) (this.upperPart >>> 32));
 	}
-	
+
 	@Override
 	public String toString() {
 		if (this.toString == null) {
@@ -163,7 +163,7 @@ public class AbstractID implements Comparable<AbstractID>, java.io.Serializable
 
 		return this.toString;
 	}
-	
+
 	@Override
 	public int compareTo(AbstractID o) {
 		int diff1 = (this.upperPart < o.upperPart) ? -1 : ((this.upperPart == o.upperPart) ? 0 : 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
deleted file mode 100644
index 93f2ba5..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.queryablestate.client.KvStateLocationLookupService;
-
-/**
- * Exception to fail Future with if no JobManager is currently registered at
- * the {@link KvStateLocationLookupService}.
- */
-public class UnknownJobManager extends Exception {
-
-	private static final long serialVersionUID = 1L;
-
-	public UnknownJobManager() {
-		super("Unknown JobManager. Either the JobManager has not registered yet " +
-				"or has lost leadership.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
new file mode 100644
index 0000000..fa2604b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownJobManagerException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Exception to fail Future if the Task Manager on which the
+ * {@link org.apache.flink.runtime.query.KvStateClientProxy}
+ * is running on, does not know the active Job Manager.
+ */
+@Internal
+public class UnknownJobManagerException extends Exception {
+
+	private static final long serialVersionUID = 9092442511708951209L;
+
+	public UnknownJobManagerException() {
+		super("Unknown JobManager. Either the JobManager has not registered yet or has lost leadership.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
deleted file mode 100644
index e921e40..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespace.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-/**
- * Thrown if the KvState does not hold any state for the given key or namespace.
- */
-public class UnknownKeyOrNamespace extends IllegalStateException {
-
-	private static final long serialVersionUID = 1L;
-
-	public UnknownKeyOrNamespace() {
-		super("KvState does not hold any state for key/namespace.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
new file mode 100644
index 0000000..c497a72
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKeyOrNamespaceException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+
+/**
+ * Thrown if the KvState does not hold any state for the given key or namespace.
+ */
+@Internal
+public class UnknownKeyOrNamespaceException extends BadRequestException {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates the exception.
+	 * @param serverName the name of the server that threw the exception.
+	 */
+	public UnknownKeyOrNamespaceException(String serverName) {
+		super(serverName, "No state for the specified key/namespace.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
deleted file mode 100644
index d5ff828..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateID.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Thrown if no KvState with the given ID cannot found by the server handler.
- */
-public class UnknownKvStateID extends IllegalStateException {
-
-	private static final long serialVersionUID = 1L;
-
-	public UnknownKvStateID(KvStateID kvStateId) {
-		super("No KvState registered with ID " + Preconditions.checkNotNull(kvStateId, "KvStateID") +
-				" at TaskManager.");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
new file mode 100644
index 0000000..59ba081
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateIdException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Thrown if no KvState with the given ID cannot found by the server handler.
+ */
+@Internal
+public class UnknownKvStateIdException extends BadRequestException {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates the exception.
+	 * @param serverName the name of the server that threw the exception.
+	 * @param kvStateId the state id for which no state was found.
+	 */
+	public UnknownKvStateIdException(String serverName, KvStateID kvStateId) {
+		super(serverName, "No registered state with ID " + Preconditions.checkNotNull(kvStateId) + '.');
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
deleted file mode 100644
index fd25fae..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocation.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate;
-
-import org.apache.flink.runtime.query.KvStateLocation;
-
-/**
- * Exception thrown if there is no location information available for the given
- * key group in a {@link KvStateLocation} instance.
- */
-public class UnknownKvStateKeyGroupLocation extends Exception {
-
-	private static final long serialVersionUID = 1L;
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
new file mode 100644
index 0000000..0d6588a
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/UnknownKvStateKeyGroupLocationException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.network.BadRequestException;
+import org.apache.flink.runtime.query.KvStateLocation;
+
+/**
+ * Exception thrown if there is no location information available for the given
+ * key group in a {@link KvStateLocation} instance.
+ */
+@Internal
+public class UnknownKvStateKeyGroupLocationException extends BadRequestException {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Creates the exception.
+	 * @param serverName the name of the server that threw the exception.
+	 */
+	public UnknownKvStateKeyGroupLocationException(String serverName) {
+		super(serverName, "Unknown key-group location.");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
deleted file mode 100644
index f42e008..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/AkkaKvStateLocationLookupService.java
+++ /dev/null
@@ -1,325 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.queryablestate.UnknownJobManager;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.util.Preconditions;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.Recover;
-import akka.pattern.Patterns;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.UUID;
-import java.util.concurrent.Callable;
-
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * Akka-based {@link KvStateLocationLookupService} that retrieves the current
- * JobManager address and uses it for lookups.
- */
-public class AkkaKvStateLocationLookupService implements KvStateLocationLookupService, LeaderRetrievalListener {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateLocationLookupService.class);
-
-	/** Future returned when no JobManager is available. */
-	private static final Future<ActorGateway> UNKNOWN_JOB_MANAGER = Futures.failed(new UnknownJobManager());
-
-	/** Leader retrieval service to retrieve the current job manager. */
-	private final LeaderRetrievalService leaderRetrievalService;
-
-	/** The actor system used to resolve the JobManager address. */
-	private final ActorSystem actorSystem;
-
-	/** Timeout for JobManager ask-requests. */
-	private final FiniteDuration askTimeout;
-
-	/** Retry strategy factory on future failures. */
-	private final LookupRetryStrategyFactory retryStrategyFactory;
-
-	/** Current job manager future. */
-	private volatile Future<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
-
-	/**
-	 * Creates the Akka-based {@link KvStateLocationLookupService}.
-	 *
-	 * @param leaderRetrievalService Leader retrieval service to use.
-	 * @param actorSystem            Actor system to use.
-	 * @param askTimeout             Timeout for JobManager ask-requests.
-	 * @param retryStrategyFactory   Retry strategy if no JobManager available.
-	 */
-	public AkkaKvStateLocationLookupService(
-			LeaderRetrievalService leaderRetrievalService,
-			ActorSystem actorSystem,
-			FiniteDuration askTimeout,
-			LookupRetryStrategyFactory retryStrategyFactory) {
-
-		this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService, "Leader retrieval service");
-		this.actorSystem = Preconditions.checkNotNull(actorSystem, "Actor system");
-		this.askTimeout = Preconditions.checkNotNull(askTimeout, "Ask Timeout");
-		this.retryStrategyFactory = Preconditions.checkNotNull(retryStrategyFactory, "Retry strategy factory");
-	}
-
-	public void start() {
-		try {
-			leaderRetrievalService.start(this);
-		} catch (Exception e) {
-			LOG.error("Failed to start leader retrieval service", e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void shutDown() {
-		try {
-			leaderRetrievalService.stop();
-		} catch (Exception e) {
-			LOG.error("Failed to stop leader retrieval service", e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public Future<KvStateLocation> getKvStateLookupInfo(final JobID jobId, final String registrationName) {
-		return getKvStateLookupInfo(jobId, registrationName, retryStrategyFactory.createRetryStrategy());
-	}
-
-	/**
-	 * Returns a future holding the {@link KvStateLocation} for the given job
-	 * and KvState registration name.
-	 *
-	 * <p>If there is currently no JobManager registered with the service, the
-	 * request is retried. The retry behaviour is specified by the
-	 * {@link LookupRetryStrategy} of the lookup service.
-	 *
-	 * @param jobId               JobID the KvState instance belongs to
-	 * @param registrationName    Name under which the KvState has been registered
-	 * @param lookupRetryStrategy Retry strategy to use for retries on UnknownJobManager failures.
-	 * @return Future holding the {@link KvStateLocation}
-	 */
-	@SuppressWarnings("unchecked")
-	private Future<KvStateLocation> getKvStateLookupInfo(
-			final JobID jobId,
-			final String registrationName,
-			final LookupRetryStrategy lookupRetryStrategy) {
-
-		return jobManagerFuture
-				.flatMap(new Mapper<ActorGateway, Future<Object>>() {
-					@Override
-					public Future<Object> apply(ActorGateway jobManager) {
-						// Lookup the KvStateLocation
-						Object msg = new KvStateMessage.LookupKvStateLocation(jobId, registrationName);
-						return jobManager.ask(msg, askTimeout);
-					}
-				}, actorSystem.dispatcher())
-				.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class))
-				.recoverWith(new Recover<Future<KvStateLocation>>() {
-					@Override
-					public Future<KvStateLocation> recover(Throwable failure) throws Throwable {
-						// If the Future fails with UnknownJobManager, retry
-						// the request. Otherwise all Futures will be failed
-						// during the start up phase, when the JobManager did
-						// not notify this service yet or leadership is lost
-						// intermittently.
-						if (failure instanceof UnknownJobManager && lookupRetryStrategy.tryRetry()) {
-							return Patterns.after(
-									lookupRetryStrategy.getRetryDelay(),
-									actorSystem.scheduler(),
-									actorSystem.dispatcher(),
-									new Callable<Future<KvStateLocation>>() {
-										@Override
-										public Future<KvStateLocation> call() throws Exception {
-											return getKvStateLookupInfo(
-													jobId,
-													registrationName,
-													lookupRetryStrategy);
-										}
-									});
-						} else {
-							return Futures.failed(failure);
-						}
-					}
-				}, actorSystem.dispatcher());
-	}
-
-	@Override
-	public void notifyLeaderAddress(String leaderAddress, final UUID leaderSessionID) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Received leader address notification {}:{}", leaderAddress, leaderSessionID);
-		}
-
-		if (leaderAddress == null) {
-			jobManagerFuture = UNKNOWN_JOB_MANAGER;
-		} else {
-			jobManagerFuture = AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, askTimeout)
-					.map(new Mapper<ActorRef, ActorGateway>() {
-						@Override
-						public ActorGateway apply(ActorRef actorRef) {
-							return new AkkaActorGateway(actorRef, leaderSessionID);
-						}
-					}, actorSystem.dispatcher());
-		}
-	}
-
-	@Override
-	public void handleError(Exception exception) {
-		jobManagerFuture = Futures.failed(exception);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Retry strategy for failed lookups.
-	 *
-	 * <p>Usage:
-	 * <pre>
-	 * LookupRetryStrategy retryStrategy = LookupRetryStrategyFactory.create();
-	 *
-	 * if (retryStrategy.tryRetry()) {
-	 *     // OK to retry
-	 *     FiniteDuration retryDelay = retryStrategy.getRetryDelay();
-	 * }
-	 * </pre>
-	 */
-	public interface LookupRetryStrategy {
-
-		/**
-		 * Returns the current retry.
-		 *
-		 * @return Current retry delay.
-		 */
-		FiniteDuration getRetryDelay();
-
-		/**
-		 * Tries another retry and returns whether it is allowed or not.
-		 *
-		 * @return Whether it is allowed to do another restart or not.
-		 */
-		boolean tryRetry();
-
-	}
-
-	/**
-	 * Factory for retry strategies.
-	 */
-	public interface LookupRetryStrategyFactory {
-
-		/**
-		 * Creates a new retry strategy.
-		 *
-		 * @return The retry strategy.
-		 */
-		LookupRetryStrategy createRetryStrategy();
-
-	}
-
-	/**
-	 * Factory for disabled retries.
-	 */
-	public static class DisabledLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
-		private static final DisabledLookupRetryStrategy RETRY_STRATEGY = new DisabledLookupRetryStrategy();
-
-		@Override
-		public LookupRetryStrategy createRetryStrategy() {
-			return RETRY_STRATEGY;
-		}
-
-		private static class DisabledLookupRetryStrategy implements LookupRetryStrategy {
-
-			@Override
-			public FiniteDuration getRetryDelay() {
-				return FiniteDuration.Zero();
-			}
-
-			@Override
-			public boolean tryRetry() {
-				return false;
-			}
-		}
-
-	}
-
-	/**
-	 * Factory for fixed delay retries.
-	 */
-	public static class FixedDelayLookupRetryStrategyFactory implements LookupRetryStrategyFactory {
-
-		private final int maxRetries;
-		private final FiniteDuration retryDelay;
-
-		FixedDelayLookupRetryStrategyFactory(int maxRetries, FiniteDuration retryDelay) {
-			this.maxRetries = maxRetries;
-			this.retryDelay = retryDelay;
-		}
-
-		@Override
-		public LookupRetryStrategy createRetryStrategy() {
-			return new FixedDelayLookupRetryStrategy(maxRetries, retryDelay);
-		}
-
-		private static class FixedDelayLookupRetryStrategy implements LookupRetryStrategy {
-
-			private final Object retryLock = new Object();
-			private final int maxRetries;
-			private final FiniteDuration retryDelay;
-			private int numRetries;
-
-			public FixedDelayLookupRetryStrategy(int maxRetries, FiniteDuration retryDelay) {
-				Preconditions.checkArgument(maxRetries >= 0, "Negative number maximum retries");
-				this.maxRetries = maxRetries;
-				this.retryDelay = Preconditions.checkNotNull(retryDelay, "Retry delay");
-			}
-
-			@Override
-			public FiniteDuration getRetryDelay() {
-				synchronized (retryLock) {
-					return retryDelay;
-				}
-			}
-
-			@Override
-			public boolean tryRetry() {
-				synchronized (retryLock) {
-					if (numRetries < maxRetries) {
-						numRetries++;
-						return true;
-					} else {
-						return false;
-					}
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
deleted file mode 100644
index d456cd7..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClient.java
+++ /dev/null
@@ -1,583 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.UnknownKeyOrNamespace;
-import org.apache.flink.queryablestate.UnknownKvStateID;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
-import org.apache.flink.runtime.query.KvStateID;
-import org.apache.flink.runtime.query.KvStateServer;
-import org.apache.flink.runtime.query.KvStateServerAddress;
-import org.apache.flink.runtime.query.netty.KvStateRequestStats;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.flink.shaded.netty4.io.netty.bootstrap.Bootstrap;
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOption;
-import org.apache.flink.shaded.netty4.io.netty.channel.EventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
-
-import akka.dispatch.Futures;
-
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayDeque;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-/**
- * Netty-based client querying {@link KvStateServer} instances.
- *
- * <p>This client can be used by multiple threads concurrently. Operations are
- * executed asynchronously and return Futures to their result.
- *
- * <p>The incoming pipeline looks as follows:
- * <pre>
- * Socket.read() -> LengthFieldBasedFrameDecoder -> KvStateServerHandler
- * </pre>
- *
- * <p>Received binary messages are expected to contain a frame length field. Netty's
- * {@link LengthFieldBasedFrameDecoder} is used to fully receive the frame before
- * giving it to our {@link KvStateClientHandler}.
- *
- * <p>Connections are established and closed by the client. The server only
- * closes the connection on a fatal failure that cannot be recovered.
- */
-public class KvStateClient {
-
-	/** Netty's Bootstrap. */
-	private final Bootstrap bootstrap;
-
-	/** Statistics tracker. */
-	private final KvStateRequestStats stats;
-
-	/** Established connections. */
-	private final ConcurrentHashMap<KvStateServerAddress, EstablishedConnection> establishedConnections =
-			new ConcurrentHashMap<>();
-
-	/** Pending connections. */
-	private final ConcurrentHashMap<KvStateServerAddress, PendingConnection> pendingConnections =
-			new ConcurrentHashMap<>();
-
-	/** Atomic shut down flag. */
-	private final AtomicBoolean shutDown = new AtomicBoolean();
-
-	/**
-	 * Creates a client with the specified number of event loop threads.
-	 *
-	 * @param numEventLoopThreads Number of event loop threads (minimum 1).
-	 */
-	public KvStateClient(int numEventLoopThreads, KvStateRequestStats stats) {
-		Preconditions.checkArgument(numEventLoopThreads >= 1, "Non-positive number of event loop threads.");
-		NettyBufferPool bufferPool = new NettyBufferPool(numEventLoopThreads);
-
-		ThreadFactory threadFactory = new ThreadFactoryBuilder()
-				.setDaemon(true)
-				.setNameFormat("Flink KvStateClient Event Loop Thread %d")
-				.build();
-
-		NioEventLoopGroup nioGroup = new NioEventLoopGroup(numEventLoopThreads, threadFactory);
-
-		this.bootstrap = new Bootstrap()
-				.group(nioGroup)
-				.channel(NioSocketChannel.class)
-				.option(ChannelOption.ALLOCATOR, bufferPool)
-				.handler(new ChannelInitializer<SocketChannel>() {
-					@Override
-					protected void initChannel(SocketChannel ch) throws Exception {
-						ch.pipeline()
-								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
-								// ChunkedWriteHandler respects Channel writability
-								.addLast(new ChunkedWriteHandler());
-					}
-				});
-
-		this.stats = Preconditions.checkNotNull(stats, "Statistics tracker");
-	}
-
-	/**
-	 * Returns a future holding the serialized request result.
-	 *
-	 * <p>If the server does not serve a KvState instance with the given ID,
-	 * the Future will be failed with a {@link UnknownKvStateID}.
-	 *
-	 * <p>If the KvState instance does not hold any data for the given key
-	 * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}.
-	 *
-	 * <p>All other failures are forwarded to the Future.
-	 *
-	 * @param serverAddress Address of the server to query
-	 * @param kvStateId ID of the KvState instance to query
-	 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance with
-	 * @return Future holding the serialized result
-	 */
-	public Future<byte[]> getKvState(
-			KvStateServerAddress serverAddress,
-			KvStateID kvStateId,
-			byte[] serializedKeyAndNamespace) {
-
-		if (shutDown.get()) {
-			return Futures.failed(new IllegalStateException("Shut down"));
-		}
-
-		EstablishedConnection connection = establishedConnections.get(serverAddress);
-
-		if (connection != null) {
-			return connection.getKvState(kvStateId, serializedKeyAndNamespace);
-		} else {
-			PendingConnection pendingConnection = pendingConnections.get(serverAddress);
-			if (pendingConnection != null) {
-				// There was a race, use the existing pending connection.
-				return pendingConnection.getKvState(kvStateId, serializedKeyAndNamespace);
-			} else {
-				// We try to connect to the server.
-				PendingConnection pending = new PendingConnection(serverAddress);
-				PendingConnection previous = pendingConnections.putIfAbsent(serverAddress, pending);
-
-				if (previous == null) {
-					// OK, we are responsible to connect.
-					bootstrap.connect(serverAddress.getHost(), serverAddress.getPort())
-							.addListener(pending);
-
-					return pending.getKvState(kvStateId, serializedKeyAndNamespace);
-				} else {
-					// There was a race, use the existing pending connection.
-					return previous.getKvState(kvStateId, serializedKeyAndNamespace);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Shuts down the client and closes all connections.
-	 *
-	 * <p>After a call to this method, all returned futures will be failed.
-	 */
-	public void shutDown() {
-		if (shutDown.compareAndSet(false, true)) {
-			for (Map.Entry<KvStateServerAddress, EstablishedConnection> conn : establishedConnections.entrySet()) {
-				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
-					conn.getValue().close();
-				}
-			}
-
-			for (Map.Entry<KvStateServerAddress, PendingConnection> conn : pendingConnections.entrySet()) {
-				if (pendingConnections.remove(conn.getKey()) != null) {
-					conn.getValue().close();
-				}
-			}
-
-			if (bootstrap != null) {
-				EventLoopGroup group = bootstrap.group();
-				if (group != null) {
-					group.shutdownGracefully(0, 10, TimeUnit.SECONDS);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Closes the connection to the given server address if it exists.
-	 *
-	 * <p>If there is a request to the server a new connection will be established.
-	 *
-	 * @param serverAddress Target address of the connection to close
-	 */
-	public void closeConnection(KvStateServerAddress serverAddress) {
-		PendingConnection pending = pendingConnections.get(serverAddress);
-		if (pending != null) {
-			pending.close();
-		}
-
-		EstablishedConnection established = establishedConnections.remove(serverAddress);
-		if (established != null) {
-			established.close();
-		}
-	}
-
-	/**
-	 * A pending connection that is in the process of connecting.
-	 */
-	private class PendingConnection implements ChannelFutureListener {
-
-		/** Lock to guard the connect call, channel hand in, etc. */
-		private final Object connectLock = new Object();
-
-		/** Address of the server we are connecting to. */
-		private final KvStateServerAddress serverAddress;
-
-		/** Queue of requests while connecting. */
-		private final ArrayDeque<PendingRequest> queuedRequests = new ArrayDeque<>();
-
-		/** The established connection after the connect succeeds. */
-		private EstablishedConnection established;
-
-		/** Closed flag. */
-		private boolean closed;
-
-		/** Failure cause if something goes wrong. */
-		private Throwable failureCause;
-
-		/**
-		 * Creates a pending connection to the given server.
-		 *
-		 * @param serverAddress Address of the server to connect to.
-		 */
-		private PendingConnection(KvStateServerAddress serverAddress) {
-			this.serverAddress = serverAddress;
-		}
-
-		@Override
-		public void operationComplete(ChannelFuture future) throws Exception {
-			// Callback from the Bootstrap's connect call.
-			if (future.isSuccess()) {
-				handInChannel(future.channel());
-			} else {
-				close(future.cause());
-			}
-		}
-
-		/**
-		 * Returns a future holding the serialized request result.
-		 *
-		 * <p>If the channel has been established, forward the call to the
-		 * established channel, otherwise queue it for when the channel is
-		 * handed in.
-		 *
-		 * @param kvStateId                 ID of the KvState instance to query
-		 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
-		 *                                  with
-		 * @return Future holding the serialized result
-		 */
-		public Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
-			synchronized (connectLock) {
-				if (failureCause != null) {
-					return Futures.failed(failureCause);
-				} else if (closed) {
-					return Futures.failed(new ClosedChannelException());
-				} else {
-					if (established != null) {
-						return established.getKvState(kvStateId, serializedKeyAndNamespace);
-					} else {
-						// Queue this and handle when connected
-						PendingRequest pending = new PendingRequest(kvStateId, serializedKeyAndNamespace);
-						queuedRequests.add(pending);
-						return pending.promise.future();
-					}
-				}
-			}
-		}
-
-		/**
-		 * Hands in a channel after a successful connection.
-		 *
-		 * @param channel Channel to hand in
-		 */
-		private void handInChannel(Channel channel) {
-			synchronized (connectLock) {
-				if (closed || failureCause != null) {
-					// Close the channel and we are done. Any queued requests
-					// are removed on the close/failure call and after that no
-					// new ones can be enqueued.
-					channel.close();
-				} else {
-					established = new EstablishedConnection(serverAddress, channel);
-
-					PendingRequest pending;
-					while ((pending = queuedRequests.poll()) != null) {
-						Future<byte[]> resultFuture = established.getKvState(
-								pending.kvStateId,
-								pending.serializedKeyAndNamespace);
-
-						pending.promise.completeWith(resultFuture);
-					}
-
-					// Publish the channel for the general public
-					establishedConnections.put(serverAddress, established);
-					pendingConnections.remove(serverAddress);
-
-					// Check shut down for possible race with shut down. We
-					// don't want any lingering connections after shut down,
-					// which can happen if we don't check this here.
-					if (shutDown.get()) {
-						if (establishedConnections.remove(serverAddress, established)) {
-							established.close();
-						}
-					}
-				}
-			}
-		}
-
-		/**
-		 * Close the connecting channel with a ClosedChannelException.
-		 */
-		private void close() {
-			close(new ClosedChannelException());
-		}
-
-		/**
-		 * Close the connecting channel with an Exception (can be
-		 * <code>null</code>) or forward to the established channel.
-		 */
-		private void close(Throwable cause) {
-			synchronized (connectLock) {
-				if (!closed) {
-					if (failureCause == null) {
-						failureCause = cause;
-					}
-
-					if (established != null) {
-						established.close();
-					} else {
-						PendingRequest pending;
-						while ((pending = queuedRequests.poll()) != null) {
-							pending.promise.tryFailure(cause);
-						}
-					}
-
-					closed = true;
-				}
-			}
-		}
-
-		/**
-		 * A pending request queued while the channel is connecting.
-		 */
-		private final class PendingRequest {
-
-			private final KvStateID kvStateId;
-			private final byte[] serializedKeyAndNamespace;
-			private final Promise<byte[]> promise;
-
-			private PendingRequest(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
-				this.kvStateId = kvStateId;
-				this.serializedKeyAndNamespace = serializedKeyAndNamespace;
-				this.promise = Futures.promise();
-			}
-		}
-
-		@Override
-		public String toString() {
-			synchronized (connectLock) {
-				return "PendingConnection{" +
-						"serverAddress=" + serverAddress +
-						", queuedRequests=" + queuedRequests.size() +
-						", established=" + (established != null) +
-						", closed=" + closed +
-						'}';
-			}
-		}
-	}
-
-	/**
-	 * An established connection that wraps the actual channel instance and is
-	 * registered at the {@link KvStateClientHandler} for callbacks.
-	 */
-	private class EstablishedConnection implements KvStateClientHandlerCallback {
-
-		/** Address of the server we are connected to. */
-		private final KvStateServerAddress serverAddress;
-
-		/** The actual TCP channel. */
-		private final Channel channel;
-
-		/** Pending requests keyed by request ID. */
-		private final ConcurrentHashMap<Long, PromiseAndTimestamp> pendingRequests = new ConcurrentHashMap<>();
-
-		/** Current request number used to assign unique request IDs. */
-		private final AtomicLong requestCount = new AtomicLong();
-
-		/** Reference to a failure that was reported by the channel. */
-		private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
-
-		/**
-		 * Creates an established connection with the given channel.
-		 *
-		 * @param serverAddress Address of the server connected to
-		 * @param channel The actual TCP channel
-		 */
-		EstablishedConnection(KvStateServerAddress serverAddress, Channel channel) {
-			this.serverAddress = Preconditions.checkNotNull(serverAddress, "KvStateServerAddress");
-			this.channel = Preconditions.checkNotNull(channel, "Channel");
-
-			// Add the client handler with the callback
-			channel.pipeline().addLast("KvStateClientHandler", new KvStateClientHandler(this));
-
-			stats.reportActiveConnection();
-		}
-
-		/**
-		 * Close the channel with a ClosedChannelException.
-		 */
-		void close() {
-			close(new ClosedChannelException());
-		}
-
-		/**
-		 * Close the channel with a cause.
-		 *
-		 * @param cause The cause to close the channel with.
-		 * @return Channel close future
-		 */
-		private boolean close(Throwable cause) {
-			if (failureCause.compareAndSet(null, cause)) {
-				channel.close();
-				stats.reportInactiveConnection();
-
-				for (long requestId : pendingRequests.keySet()) {
-					PromiseAndTimestamp pending = pendingRequests.remove(requestId);
-					if (pending != null && pending.promise.tryFailure(cause)) {
-						stats.reportFailedRequest();
-					}
-				}
-
-				return true;
-			}
-
-			return false;
-		}
-
-		/**
-		 * Returns a future holding the serialized request result.
-		 *
-		 * @param kvStateId                 ID of the KvState instance to query
-		 * @param serializedKeyAndNamespace Serialized key and namespace to query KvState instance
-		 *                                  with
-		 * @return Future holding the serialized result
-		 */
-		Future<byte[]> getKvState(KvStateID kvStateId, byte[] serializedKeyAndNamespace) {
-			PromiseAndTimestamp requestPromiseTs = new PromiseAndTimestamp(
-					Futures.<byte[]>promise(),
-					System.nanoTime());
-
-			try {
-				final long requestId = requestCount.getAndIncrement();
-				pendingRequests.put(requestId, requestPromiseTs);
-
-				stats.reportRequest();
-
-				ByteBuf buf = MessageSerializer.serializeKvStateRequest(
-						channel.alloc(),
-						requestId,
-						kvStateId,
-						serializedKeyAndNamespace);
-
-				channel.writeAndFlush(buf).addListener(new ChannelFutureListener() {
-					@Override
-					public void operationComplete(ChannelFuture future) throws Exception {
-						if (!future.isSuccess()) {
-							// Fail promise if not failed to write
-							PromiseAndTimestamp pending = pendingRequests.remove(requestId);
-							if (pending != null && pending.promise.tryFailure(future.cause())) {
-								stats.reportFailedRequest();
-							}
-						}
-					}
-				});
-
-				// Check failure for possible race. We don't want any lingering
-				// promises after a failure, which can happen if we don't check
-				// this here. Note that close is treated as a failure as well.
-				Throwable failure = failureCause.get();
-				if (failure != null) {
-					// Remove from pending requests to guard against concurrent
-					// removal and to make sure that we only count it once as failed.
-					PromiseAndTimestamp p = pendingRequests.remove(requestId);
-					if (p != null && p.promise.tryFailure(failure)) {
-						stats.reportFailedRequest();
-					}
-				}
-			} catch (Throwable t) {
-				requestPromiseTs.promise.tryFailure(t);
-			}
-
-			return requestPromiseTs.promise.future();
-		}
-
-		@Override
-		public void onRequestResult(long requestId, byte[] serializedValue) {
-			PromiseAndTimestamp pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.promise.trySuccess(serializedValue)) {
-				long durationMillis = (System.nanoTime() - pending.timestamp) / 1_000_000;
-				stats.reportSuccessfulRequest(durationMillis);
-			}
-		}
-
-		@Override
-		public void onRequestFailure(long requestId, Throwable cause) {
-			PromiseAndTimestamp pending = pendingRequests.remove(requestId);
-			if (pending != null && pending.promise.tryFailure(cause)) {
-				stats.reportFailedRequest();
-			}
-		}
-
-		@Override
-		public void onFailure(Throwable cause) {
-			if (close(cause)) {
-				// Remove from established channels, otherwise future
-				// requests will be handled by this failed channel.
-				establishedConnections.remove(serverAddress, this);
-			}
-		}
-
-		@Override
-		public String toString() {
-			return "EstablishedConnection{" +
-					"serverAddress=" + serverAddress +
-					", channel=" + channel +
-					", pendingRequests=" + pendingRequests.size() +
-					", requestCount=" + requestCount +
-					", failureCause=" + failureCause +
-					'}';
-		}
-
-		/**
-		 * Pair of promise and a timestamp.
-		 */
-		private class PromiseAndTimestamp {
-
-			private final Promise<byte[]> promise;
-			private final long timestamp;
-
-			public PromiseAndTimestamp(Promise<byte[]> promise, long timestamp) {
-				this.promise = promise;
-				this.timestamp = timestamp;
-			}
-		}
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
deleted file mode 100644
index 36a2b31..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
-import org.apache.flink.queryablestate.messages.KvStateRequestResult;
-import org.apache.flink.queryablestate.network.messages.MessageSerializer;
-import org.apache.flink.queryablestate.network.messages.MessageType;
-import org.apache.flink.queryablestate.server.KvStateServerHandler;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.channels.ClosedChannelException;
-
-/**
- * This handler expects responses from {@link KvStateServerHandler}.
- *
- * <p>It deserializes the response and calls the registered callback, which is
- * responsible for actually handling the result (see {@link KvStateClient.EstablishedConnection}).
- */
-public class KvStateClientHandler extends ChannelInboundHandlerAdapter {
-
-	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientHandler.class);
-
-	private final KvStateClientHandlerCallback callback;
-
-	/**
-	 * Creates a {@link KvStateClientHandler} with the callback.
-	 *
-	 * @param callback Callback for responses.
-	 */
-	public KvStateClientHandler(KvStateClientHandlerCallback callback) {
-		this.callback = callback;
-	}
-
-	@Override
-	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-		try {
-			ByteBuf buf = (ByteBuf) msg;
-			MessageType msgType = MessageSerializer.deserializeHeader(buf);
-
-			if (msgType == MessageType.REQUEST_RESULT) {
-				KvStateRequestResult result = MessageSerializer.deserializeKvStateRequestResult(buf);
-				callback.onRequestResult(result.getRequestId(), result.getSerializedResult());
-			} else if (msgType == MessageType.REQUEST_FAILURE) {
-				KvStateRequestFailure failure = MessageSerializer.deserializeKvStateRequestFailure(buf);
-				callback.onRequestFailure(failure.getRequestId(), failure.getCause());
-			} else if (msgType == MessageType.SERVER_FAILURE) {
-				throw MessageSerializer.deserializeServerFailure(buf);
-			} else {
-				throw new IllegalStateException("Unexpected response type '" + msgType + "'");
-			}
-		} catch (Throwable t1) {
-			try {
-				callback.onFailure(t1);
-			} catch (Throwable t2) {
-				LOG.error("Failed to notify callback about failure", t2);
-			}
-		} finally {
-			ReferenceCountUtil.release(msg);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-		try {
-			callback.onFailure(cause);
-		} catch (Throwable t) {
-			LOG.error("Failed to notify callback about failure", t);
-		}
-	}
-
-	@Override
-	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-		// Only the client is expected to close the channel. Otherwise it
-		// indicates a failure. Note that this will be invoked in both cases
-		// though. If the callback closed the channel, the callback must be
-		// ignored.
-		try {
-			callback.onFailure(new ClosedChannelException());
-		} catch (Throwable t) {
-			LOG.error("Failed to notify callback about failure", t);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
deleted file mode 100644
index 98718fa..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateClientHandlerCallback.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.queryablestate.messages.KvStateRequest;
-
-/**
- * Callback for {@link KvStateClientHandler}.
- */
-public interface KvStateClientHandlerCallback {
-
-	/**
-	 * Called on a successful {@link KvStateRequest}.
-	 *
-	 * @param requestId       ID of the request
-	 * @param serializedValue Serialized value for the request
-	 */
-	void onRequestResult(long requestId, byte[] serializedValue);
-
-	/**
-	 * Called on a failed {@link KvStateRequest}.
-	 *
-	 * @param requestId ID of the request
-	 * @param cause     Cause of the request failure
-	 */
-	void onRequestFailure(long requestId, Throwable cause);
-
-	/**
-	 * Called on any failure, which is not related to a specific request.
-	 *
-	 * <p>This can be for example a caught Exception in the channel pipeline
-	 * or an unexpected channel close.
-	 *
-	 * @param cause Cause of the failure
-	 */
-	void onFailure(Throwable cause);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f48f5340/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java b/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
deleted file mode 100644
index 635cbae..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/main/java/org/apache/flink/queryablestate/client/KvStateLocationLookupService.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.client;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.query.KvStateLocation;
-
-import scala.concurrent.Future;
-
-/**
- * {@link KvStateLocation} lookup service.
- */
-public interface KvStateLocationLookupService {
-
-	/**
-	 * Starts the lookup service.
-	 */
-	void start();
-
-	/**
-	 * Shuts down the lookup service.
-	 */
-	void shutDown();
-
-	/**
-	 * Returns a future holding the {@link KvStateLocation} for the given job
-	 * and KvState registration name.
-	 *
-	 * @param jobId            JobID the KvState instance belongs to
-	 * @param registrationName Name under which the KvState has been registered
-	 * @return Future holding the {@link KvStateLocation}
-	 */
-	Future<KvStateLocation> getKvStateLookupInfo(JobID jobId, String registrationName);
-
-}