You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:02:01 UTC

[12/13] flink git commit: [FLINK-7908][QS] Restructure the queryable state module.

[FLINK-7908][QS] Restructure the queryable state module.

The QS module is split into core and client. The core should
be put in the lib folder to enable queryable state, while the
client is the one that the user will program against. The
reason for the restructuring in mainly to remove the dependency
on the flink-runtime from the user's program.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c771505
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c771505
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c771505

Branch: refs/heads/master
Commit: 0c771505b84cdacf7a359c3be0efe38a30f9e660
Parents: 8595dad
Author: kkloudas <kk...@gmail.com>
Authored: Tue Oct 24 12:16:08 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Oct 26 18:57:34 2017 +0200

----------------------------------------------------------------------
 .../streaming/state/AbstractRocksDBState.java   |    2 +-
 .../streaming/state/RocksDBMapState.java        |    2 +-
 .../flink-queryable-state-client-java/pom.xml   |   90 ++
 .../flink/queryablestate/FutureUtils.java       |   43 +
 .../apache/flink/queryablestate/KvStateID.java  |   40 +
 .../client/QueryableStateClient.java            |  257 +++
 .../queryablestate/client/VoidNamespace.java    |   75 +
 .../client/VoidNamespaceSerializer.java         |   96 ++
 .../client/VoidNamespaceTypeInfo.java           |   92 ++
 .../client/state/ImmutableAggregatingState.java |   71 +
 .../client/state/ImmutableFoldingState.java     |   70 +
 .../client/state/ImmutableListState.java        |   70 +
 .../client/state/ImmutableMapState.java         |  139 ++
 .../client/state/ImmutableReducingState.java    |   69 +
 .../client/state/ImmutableState.java            |   29 +
 .../client/state/ImmutableStateBinder.java      |   80 +
 .../client/state/ImmutableValueState.java       |   69 +
 .../serialization/DataInputDeserializer.java    |  392 +++++
 .../serialization/DataOutputSerializer.java     |  344 ++++
 .../state/serialization/KvStateSerializer.java  |  265 ++++
 .../exceptions/UnknownJobManagerException.java  |   36 +
 .../UnknownKeyOrNamespaceException.java         |   39 +
 .../exceptions/UnknownKvStateIdException.java   |   42 +
 ...UnknownKvStateKeyGroupLocationException.java |   39 +
 .../queryablestate/messages/KvStateRequest.java |  140 ++
 .../messages/KvStateResponse.java               |   74 +
 .../network/AbstractServerBase.java             |  308 ++++
 .../network/AbstractServerHandler.java          |  305 ++++
 .../network/BadRequestException.java            |   35 +
 .../queryablestate/network/ChunkedByteBuf.java  |  100 ++
 .../flink/queryablestate/network/Client.java    |  536 +++++++
 .../queryablestate/network/ClientHandler.java   |  122 ++
 .../network/ClientHandlerCallback.java          |   56 +
 .../queryablestate/network/NettyBufferPool.java |  171 ++
 .../network/messages/MessageBody.java           |   38 +
 .../network/messages/MessageDeserializer.java   |   39 +
 .../network/messages/MessageSerializer.java     |  320 ++++
 .../network/messages/MessageType.java           |   42 +
 .../network/messages/RequestFailure.java        |   71 +
 .../stats/AtomicKvStateRequestStats.java        |  104 ++
 .../stats/DisabledKvStateRequestStats.java      |   45 +
 .../network/stats/KvStateRequestStats.java      |   54 +
 .../client/VoidNamespaceTypeInfoTest.java       |   32 +
 .../state/ImmutableAggregatingStateTest.java    |  107 ++
 .../client/state/ImmutableFoldingStateTest.java |   93 ++
 .../client/state/ImmutableListStateTest.java    |  110 ++
 .../client/state/ImmutableMapStateTest.java     |  188 +++
 .../state/ImmutableReducingStateTest.java       |   83 +
 .../client/state/ImmutableValueStateTest.java   |   69 +
 .../src/test/resources/log4j-test.properties    |   31 +
 .../flink-queryable-state-java/pom.xml          |  137 --
 .../UnknownJobManagerException.java             |   36 -
 .../UnknownKeyOrNamespaceException.java         |   39 -
 .../UnknownKvStateIdException.java              |   42 -
 ...UnknownKvStateKeyGroupLocationException.java |   41 -
 .../client/QueryableStateClient.java            |  260 ---
 .../client/proxy/KvStateClientProxyHandler.java |  225 ---
 .../client/proxy/KvStateClientProxyImpl.java    |  128 --
 .../client/state/ImmutableAggregatingState.java |   71 -
 .../client/state/ImmutableFoldingState.java     |   70 -
 .../client/state/ImmutableListState.java        |   70 -
 .../client/state/ImmutableMapState.java         |  139 --
 .../client/state/ImmutableReducingState.java    |   69 -
 .../client/state/ImmutableState.java            |   29 -
 .../client/state/ImmutableStateBinder.java      |   80 -
 .../client/state/ImmutableValueState.java       |   69 -
 .../messages/KvStateInternalRequest.java        |   93 --
 .../queryablestate/messages/KvStateRequest.java |  141 --
 .../messages/KvStateResponse.java               |   75 -
 .../network/AbstractServerBase.java             |  310 ----
 .../network/AbstractServerHandler.java          |  306 ----
 .../network/BadRequestException.java            |   35 -
 .../queryablestate/network/ChunkedByteBuf.java  |  100 --
 .../flink/queryablestate/network/Client.java    |  537 -------
 .../queryablestate/network/ClientHandler.java   |  122 --
 .../network/ClientHandlerCallback.java          |   56 -
 .../network/messages/MessageBody.java           |   38 -
 .../network/messages/MessageDeserializer.java   |   39 -
 .../network/messages/MessageSerializer.java     |  320 ----
 .../network/messages/MessageType.java           |   42 -
 .../network/messages/RequestFailure.java        |   71 -
 .../server/KvStateServerHandler.java            |  107 --
 .../server/KvStateServerImpl.java               |  111 --
 .../itcases/AbstractQueryableStateTestBase.java | 1496 ------------------
 .../HAAbstractQueryableStateTestBase.java       |   98 --
 .../HAQueryableStateFsBackendITCase.java        |   45 -
 .../HAQueryableStateRocksDBBackendITCase.java   |   47 -
 .../KVStateRequestSerializerRocksDBTest.java    |  167 --
 .../NonHAAbstractQueryableStateTestBase.java    |   78 -
 .../NonHAQueryableStateFsBackendITCase.java     |   45 -
 ...NonHAQueryableStateRocksDBBackendITCase.java |   47 -
 .../network/AbstractServerTest.java             |  219 ---
 .../queryablestate/network/ClientTest.java      |  784 ---------
 .../network/KvStateClientHandlerTest.java       |  119 --
 .../network/KvStateServerHandlerTest.java       |  758 ---------
 .../network/KvStateServerTest.java              |  207 ---
 .../network/MessageSerializerTest.java          |  220 ---
 .../state/ImmutableAggregatingStateTest.java    |  108 --
 .../state/ImmutableFoldingStateTest.java        |   94 --
 .../state/ImmutableListStateTest.java           |  112 --
 .../state/ImmutableMapStateTest.java            |  189 ---
 .../state/ImmutableReducingStateTest.java       |   84 -
 .../state/ImmutableValueStateTest.java          |   70 -
 .../src/test/resources/log4j-test.properties    |   31 -
 .../flink-queryable-state-runtime/pom.xml       |  119 ++
 .../client/proxy/KvStateClientProxyHandler.java |  225 +++
 .../client/proxy/KvStateClientProxyImpl.java    |  128 ++
 .../messages/KvStateInternalRequest.java        |   93 ++
 .../server/KvStateServerHandler.java            |  107 ++
 .../server/KvStateServerImpl.java               |  111 ++
 .../itcases/AbstractQueryableStateTestBase.java | 1496 ++++++++++++++++++
 .../HAAbstractQueryableStateTestBase.java       |   98 ++
 .../HAQueryableStateFsBackendITCase.java        |   45 +
 .../HAQueryableStateRocksDBBackendITCase.java   |   47 +
 .../KVStateRequestSerializerRocksDBTest.java    |  167 ++
 .../NonHAAbstractQueryableStateTestBase.java    |   78 +
 .../NonHAQueryableStateFsBackendITCase.java     |   45 +
 ...NonHAQueryableStateRocksDBBackendITCase.java |   47 +
 .../network/AbstractServerTest.java             |  219 +++
 .../queryablestate/network/ClientTest.java      |  782 +++++++++
 .../network/KvStateClientHandlerTest.java       |  119 ++
 .../network/KvStateRequestSerializerTest.java   |  416 +++++
 .../network/KvStateServerHandlerTest.java       |  758 +++++++++
 .../network/KvStateServerTest.java              |  212 +++
 .../network/MessageSerializerTest.java          |  220 +++
 .../src/test/resources/log4j-test.properties    |   31 +
 flink-queryable-state/pom.xml                   |    5 +-
 flink-runtime/pom.xml                           |    6 +
 .../flink/runtime/jobmaster/JobMaster.java      |    6 +-
 .../runtime/jobmaster/JobMasterGateway.java     |    6 +-
 .../apache/flink/runtime/query/KvStateID.java   |   41 -
 .../flink/runtime/query/KvStateLocation.java    |   14 +-
 .../runtime/query/KvStateLocationRegistry.java  |    4 +-
 .../flink/runtime/query/KvStateMessage.java     |   10 +-
 .../flink/runtime/query/KvStateRegistry.java    |    1 +
 .../runtime/query/KvStateRegistryGateway.java   |    5 +-
 .../runtime/query/KvStateRegistryListener.java  |    1 +
 .../flink/runtime/query/KvStateServer.java      |    6 +-
 .../runtime/query/KvStateServerAddress.java     |   95 --
 .../runtime/query/QueryableStateUtils.java      |    8 +-
 .../runtime/query/TaskKvStateRegistry.java      |    1 +
 .../query/netty/AtomicKvStateRequestStats.java  |  104 --
 .../netty/DisabledKvStateRequestStats.java      |   45 -
 .../query/netty/KvStateRequestStats.java        |   55 -
 .../query/netty/message/KvStateSerializer.java  |  267 ----
 .../runtime/state/heap/AbstractHeapState.java   |    6 +-
 .../flink/runtime/state/heap/HeapMapState.java  |    2 +-
 .../taskexecutor/TaskManagerServices.java       |    2 +-
 .../rpc/RpcKvStateRegistryListener.java         |    9 +-
 .../ActorGatewayKvStateRegistryListener.java    |   11 +-
 .../runtime/jobmanager/JobManagerTest.java      |   12 +-
 .../query/KvStateLocationRegistryTest.java      |   10 +-
 .../runtime/query/KvStateLocationTest.java      |    8 +-
 .../message/KvStateRequestSerializerTest.java   |  415 -----
 .../runtime/state/StateBackendTestBase.java     |    4 +-
 .../completeness/TypeInfoTestCoverageTest.java  |    3 +-
 pom.xml                                         |    2 +-
 tools/travis_mvn_watchdog.sh                    |    3 +-
 158 files changed, 11373 insertions(+), 10107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
index cf365b4..969a1fc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
@@ -27,7 +27,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
index 421bb2e..e8c34cc 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
 import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.internal.InternalMapState;
 import org.apache.flink.util.Preconditions;

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/pom.xml b/flink-queryable-state/flink-queryable-state-client-java/pom.xml
new file mode 100644
index 0000000..8a4ff69
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/pom.xml
@@ -0,0 +1,90 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-queryable-state</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId>
+	<name>flink-queryable-state-client-java</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-netty</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-shaded-guava</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+			<type>test-jar</type>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java
new file mode 100644
index 0000000..e2af7b1
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/FutureUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Utility class for {@link java.util.concurrent.Future Java Futures}.
+ */
+public class FutureUtils {
+
+	// ------------------------------------------------------------------------
+	//  Future Completed with an exception.
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Returns a {@link CompletableFuture} that has failed with the exception
+	 * provided as argument.
+	 * @param throwable the exception to fail the future with.
+	 * @return The failed future.
+	 */
+	public static <T> CompletableFuture<T> getFailedFuture(Throwable throwable) {
+		CompletableFuture<T> failedAttempt = new CompletableFuture<>();
+		failedAttempt.completeExceptionally(throwable);
+		return failedAttempt;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java
new file mode 100644
index 0000000..992b283
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/KvStateID.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate;
+
+import org.apache.flink.util.AbstractID;
+
+/**
+ *  Identifier for state instances.
+ *
+ * <p>Assigned when registering the state at the state registry.
+ */
+public class KvStateID extends AbstractID {
+
+	private static final long serialVersionUID = 1L;
+
+	public KvStateID() {
+		super();
+	}
+
+	public KvStateID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
new file mode 100644
index 0000000..304505a
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.queryablestate.FutureUtils;
+import org.apache.flink.queryablestate.client.state.ImmutableStateBinder;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Client for querying Flink's managed state.
+ *
+ * <p>You can mark state as queryable via {@link StateDescriptor#setQueryable(String)}.
+ * The state instance created from this descriptor will be published for queries when it's
+ * created on the Task Managers and the location will be reported to the Job Manager.
+ *
+ * <p>The client connects to a {@code Client Proxy} running on a given Task Manager. The
+ * proxy is the entry point of the client to the Flink cluster. It forwards the requests
+ * of the client to the Job Manager and the required Task Manager, and forwards the final
+ * response back the client.
+ *
+ * <p>The proxy, initially resolves the location of the requested KvState via the JobManager. Resolved
+ * locations are cached. When the server address of the requested KvState instance is determined, the
+ * client sends out a request to the server. The returned final answer is then forwarded to the Client.
+ */
+@PublicEvolving
+public class QueryableStateClient {
+
+	private static final Logger LOG = LoggerFactory.getLogger(QueryableStateClient.class);
+
+	/** The client that forwards the requests to the proxy. */
+	private final Client<KvStateRequest, KvStateResponse> client;
+
+	/** The address of the proxy this client is connected to. */
+	private final InetSocketAddress remoteAddress;
+
+	/** The execution configuration used to instantiate the different (de-)serializers. */
+	private ExecutionConfig executionConfig;
+
+	/**
+	 * Create the Queryable State Client.
+	 * @param remoteHostname the hostname of the {@code Client Proxy} to connect to.
+	 * @param remotePort the port of the proxy to connect to.
+	 */
+	public QueryableStateClient(final String remoteHostname, final int remotePort) throws UnknownHostException {
+		this(InetAddress.getByName(Preconditions.checkNotNull(remoteHostname)), remotePort);
+	}
+
+	/**
+	 * Create the Queryable State Client.
+	 * @param remoteAddress the {@link InetAddress address} of the {@code Client Proxy} to connect to.
+	 * @param remotePort the port of the proxy to connect to.
+	 */
+	public QueryableStateClient(final InetAddress remoteAddress, final int remotePort) {
+		Preconditions.checkArgument(remotePort >= 0 && remotePort <= 65536,
+				"Remote Port " + remotePort + " is out of valid port range (0-65536).");
+
+		this.remoteAddress = new InetSocketAddress(remoteAddress, remotePort);
+
+		final MessageSerializer<KvStateRequest, KvStateResponse> messageSerializer =
+				new MessageSerializer<>(
+						new KvStateRequest.KvStateRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
+
+		this.client = new Client<>(
+				"Queryable State Client",
+				1,
+				messageSerializer,
+				new DisabledKvStateRequestStats());
+	}
+
+	/** Shuts down the client. */
+	public void shutdown() {
+		client.shutdown();
+	}
+
+	/**
+	 * Gets the {@link ExecutionConfig}.
+	 */
+	public ExecutionConfig getExecutionConfig() {
+		return executionConfig;
+	}
+
+	/**
+	 * Replaces the existing {@link ExecutionConfig} (possibly {@code null}), with the provided one.
+	 * @param config The new {@code configuration}.
+	 * @return The old configuration, or {@code null} if none was specified.
+	 * */
+	public ExecutionConfig setExecutionConfig(ExecutionConfig config) {
+		ExecutionConfig prev = executionConfig;
+		this.executionConfig = config;
+		return prev;
+	}
+
+	/**
+	 * Returns a future holding the request result.	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key we are interested in.
+	 * @param keyTypeHint				A {@link TypeHint} used to extract the type of the key.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the immutable {@link State} object containing the result.
+	 */
+	@PublicEvolving
+	public <K, S extends State, V> CompletableFuture<S> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final TypeHint<K> keyTypeHint,
+			final StateDescriptor<S, V> stateDescriptor) {
+
+		Preconditions.checkNotNull(keyTypeHint);
+
+		TypeInformation<K> keyTypeInfo = keyTypeHint.getTypeInfo();
+		return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor);
+	}
+
+	/**
+	 * Returns a future holding the request result.	 *
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key we are interested in.
+	 * @param keyTypeInfo				The {@link TypeInformation} of the key.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the immutable {@link State} object containing the result.
+	 */
+	@PublicEvolving
+	public <K, S extends State, V> CompletableFuture<S> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final TypeInformation<K> keyTypeInfo,
+			final StateDescriptor<S, V> stateDescriptor) {
+
+		return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE,
+				keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor);
+	}
+
+	/**
+	 * Returns a future holding the request result.
+	 * @param jobId                     JobID of the job the queryable state belongs to.
+	 * @param queryableStateName        Name under which the state is queryable.
+	 * @param key			            The key that the state we request is associated with.
+	 * @param namespace					The namespace of the state.
+	 * @param keyTypeInfo				The {@link TypeInformation} of the keys.
+	 * @param namespaceTypeInfo			The {@link TypeInformation} of the namespace.
+	 * @param stateDescriptor			The {@link StateDescriptor} of the state we want to query.
+	 * @return Future holding the immutable {@link State} object containing the result.
+	 */
+	@PublicEvolving
+	public <K, N, S extends State, V> CompletableFuture<S> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final K key,
+			final N namespace,
+			final TypeInformation<K> keyTypeInfo,
+			final TypeInformation<N> namespaceTypeInfo,
+			final StateDescriptor<S, V> stateDescriptor) {
+
+		Preconditions.checkNotNull(jobId);
+		Preconditions.checkNotNull(queryableStateName);
+		Preconditions.checkNotNull(key);
+		Preconditions.checkNotNull(namespace);
+
+		Preconditions.checkNotNull(keyTypeInfo);
+		Preconditions.checkNotNull(namespaceTypeInfo);
+		Preconditions.checkNotNull(stateDescriptor);
+
+		TypeSerializer<K> keySerializer = keyTypeInfo.createSerializer(executionConfig);
+		TypeSerializer<N> namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig);
+
+		stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+
+		final byte[] serializedKeyAndNamespace;
+		try {
+			serializedKeyAndNamespace = KvStateSerializer
+					.serializeKeyAndNamespace(key, keySerializer, namespace, namespaceSerializer);
+		} catch (IOException e) {
+			return FutureUtils.getFailedFuture(e);
+		}
+
+		return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace).thenApply(
+				stateResponse -> {
+					try {
+						return stateDescriptor.bind(new ImmutableStateBinder(stateResponse.getContent()));
+					} catch (Exception e) {
+						throw new FlinkRuntimeException(e);
+					}
+				});
+	}
+
+	/**
+	 * Returns a future holding the serialized request result.
+	 *
+	 * @param jobId                     JobID of the job the queryable state
+	 *                                  belongs to
+	 * @param queryableStateName        Name under which the state is queryable
+	 * @param keyHashCode               Integer hash code of the key (result of
+	 *                                  a call to {@link Object#hashCode()}
+	 * @param serializedKeyAndNamespace Serialized key and namespace to query
+	 *                                  KvState instance with
+	 * @return Future holding the serialized result
+	 */
+	private CompletableFuture<KvStateResponse> getKvState(
+			final JobID jobId,
+			final String queryableStateName,
+			final int keyHashCode,
+			final byte[] serializedKeyAndNamespace) {
+		LOG.info("Sending State Request to {}.", remoteAddress);
+		try {
+			KvStateRequest request = new KvStateRequest(jobId, queryableStateName, keyHashCode, serializedKeyAndNamespace);
+			return client.sendRequest(remoteAddress, request);
+		} catch (Exception e) {
+			LOG.error("Unable to send KVStateRequest: ", e);
+			return FutureUtils.getFailedFuture(e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java
new file mode 100644
index 0000000..0560ec2
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespace.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.annotation.Internal;
+
+import java.io.ObjectStreamException;
+
+/**
+ * Singleton placeholder class for state without a namespace.
+ *
+ * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
+ */
+@Internal
+public final class VoidNamespace {
+
+	// ------------------------------------------------------------------------
+	//  Singleton instance.
+	// ------------------------------------------------------------------------
+
+	/** The singleton instance. */
+	public static final VoidNamespace INSTANCE = new VoidNamespace();
+
+	/** Getter for the singleton instance. */
+	public static VoidNamespace get() {
+		return INSTANCE;
+	}
+
+	/** This class should not be instantiated. */
+	private VoidNamespace() {}
+
+	// ------------------------------------------------------------------------
+	//  Standard Utilities
+	// ------------------------------------------------------------------------
+
+	@Override
+	public int hashCode() {
+		return 99;
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return obj == this;
+	}
+
+	@Override
+	public String toString() {
+		return getClass().getSimpleName();
+	}
+
+	// ------------------------------------------------------------------------
+	//  Singleton serialization
+	// ------------------------------------------------------------------------
+
+	// make sure that we preserve the singleton properly on serialization
+	private Object readResolve() throws ObjectStreamException {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
new file mode 100644
index 0000000..38db705
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceSerializer.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Serializer for {@link VoidNamespace}.
+ *
+ * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
+ */
+@Internal
+public final class VoidNamespaceSerializer extends TypeSerializerSingleton<VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	public static final VoidNamespaceSerializer INSTANCE = new VoidNamespaceSerializer();
+
+	@Override
+	public boolean isImmutableType() {
+		return true;
+	}
+
+	@Override
+	public VoidNamespace createInstance() {
+		return VoidNamespace.get();
+	}
+
+	@Override
+	public VoidNamespace copy(VoidNamespace from) {
+		return VoidNamespace.get();
+	}
+
+	@Override
+	public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) {
+		return VoidNamespace.get();
+	}
+
+	@Override
+	public int getLength() {
+		return 0;
+	}
+
+	@Override
+	public void serialize(VoidNamespace record, DataOutputView target) throws IOException {
+		// Make progress in the stream, write one byte.
+		//
+		// We could just skip writing anything here, because of the way this is
+		// used with the state backends, but if it is ever used somewhere else
+		// (even though it is unlikely to happen), it would be a problem.
+		target.write(0);
+	}
+
+	@Override
+	public VoidNamespace deserialize(DataInputView source) throws IOException {
+		source.readByte();
+		return VoidNamespace.get();
+	}
+
+	@Override
+	public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException {
+		source.readByte();
+		return VoidNamespace.get();
+	}
+
+	@Override
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		target.write(source.readByte());
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof VoidNamespaceSerializer;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
new file mode 100644
index 0000000..2efb87b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+
+/**
+ * {@link TypeInformation} for {@link VoidNamespace}.
+ *
+ * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
+ */
+@Internal
+public class VoidNamespaceTypeInfo extends TypeInformation<VoidNamespace> {
+
+	private static final long serialVersionUID = 5453679706408610586L;
+
+	public static final VoidNamespaceTypeInfo INSTANCE = new VoidNamespaceTypeInfo();
+
+	@Override
+	public boolean isBasicType() {
+		return false;
+	}
+
+	@Override
+	public boolean isTupleType() {
+		return false;
+	}
+
+	@Override
+	public int getArity() {
+		return 0;
+	}
+
+	@Override
+	public int getTotalFields() {
+		return 0;
+	}
+
+	@Override
+	public Class<VoidNamespace> getTypeClass() {
+		return VoidNamespace.class;
+	}
+
+	@Override
+	public boolean isKeyType() {
+		return false;
+	}
+
+	@Override
+	public TypeSerializer<VoidNamespace> createSerializer(ExecutionConfig config) {
+		return VoidNamespaceSerializer.INSTANCE;
+	}
+
+	@Override
+	public String toString() {
+		return "VoidNamespaceTypeInfo";
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		return this == obj || obj instanceof VoidNamespaceTypeInfo;
+	}
+
+	@Override
+	public int hashCode() {
+		return 0;
+	}
+
+	@Override
+	public boolean canEqual(Object obj) {
+		return obj instanceof VoidNamespaceTypeInfo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
new file mode 100644
index 0000000..8964fbf
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableAggregatingState.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link AggregatingState} that <b>does not</b> allow for modifications.
+ *
+ * <p>This is the type of the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link AggregatingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableAggregatingState<IN, OUT> extends ImmutableState implements AggregatingState<IN, OUT> {
+
+	private final OUT value;
+
+	private ImmutableAggregatingState(OUT value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public OUT get() {
+		return value;
+	}
+
+	@Override
+	public void add(Object newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <IN, ACC, OUT> ImmutableAggregatingState<IN, OUT> createState(
+			final AggregatingStateDescriptor<IN, ACC, OUT> stateDescriptor,
+			final byte[] serializedValue) throws IOException {
+
+		final ACC accumulator = KvStateSerializer.deserializeValue(
+				serializedValue,
+				stateDescriptor.getSerializer());
+
+		final OUT state = stateDescriptor.getAggregateFunction().getResult(accumulator);
+		return new ImmutableAggregatingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
new file mode 100644
index 0000000..25f3118
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableFoldingState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link FoldingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link FoldingStateDescriptor}.
+ */
+@PublicEvolving
+@Deprecated
+public final class ImmutableFoldingState<IN, ACC> extends ImmutableState implements FoldingState<IN, ACC> {
+
+	private final ACC value;
+
+	private ImmutableFoldingState(ACC value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public ACC get() {
+		return value;
+	}
+
+	@Override
+	public void add(Object newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <IN, ACC> ImmutableFoldingState<IN, ACC> createState(
+			final FoldingStateDescriptor<IN, ACC> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final ACC state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableFoldingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
new file mode 100644
index 0000000..3dcd75d
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableListState.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A read-only {@link ListState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ListStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableListState<V> extends ImmutableState implements ListState<V> {
+
+	private final List<V> listState;
+
+	private ImmutableListState(final List<V> state) {
+		this.listState = Preconditions.checkNotNull(state);
+	}
+
+	@Override
+	public Iterable<V> get() {
+		return listState;
+	}
+
+	@Override
+	public void add(V value) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableListState<V> createState(
+			final ListStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final List<V> state = KvStateSerializer.deserializeList(
+				serializedState,
+				stateDescriptor.getElementSerializer());
+		return new ImmutableListState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
new file mode 100644
index 0000000..bb08cf0
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableMapState.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A read-only {@link MapState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link MapStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableMapState<K, V> extends ImmutableState implements MapState<K, V> {
+
+	private final Map<K, V> state;
+
+	private ImmutableMapState(final Map<K, V> mapState) {
+		this.state = Preconditions.checkNotNull(mapState);
+	}
+
+	@Override
+	public V get(K key) {
+		return state.get(key);
+	}
+
+	@Override
+	public void put(K key, V value) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void putAll(Map<K, V> map) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void remove(K key) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public boolean contains(K key) {
+		return state.containsKey(key);
+	}
+
+	/**
+	 * Returns all the mappings in the state in a {@link Collections#unmodifiableSet(Set)}.
+	 *
+	 * @return A read-only iterable view of all the key-value pairs in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<Map.Entry<K, V>> entries() {
+		return Collections.unmodifiableSet(state.entrySet());
+	}
+
+	/**
+	 * Returns all the keys in the state in a {@link Collections#unmodifiableSet(Set)}.
+	 *
+	 * @return A read-only iterable view of all the keys in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<K> keys() {
+		return Collections.unmodifiableSet(state.keySet());
+	}
+
+	/**
+	 * Returns all the values in the state in a {@link Collections#unmodifiableCollection(Collection)}.
+	 *
+	 * @return A read-only iterable view of all the values in the state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterable<V> values() {
+		return Collections.unmodifiableCollection(state.values());
+	}
+
+	/**
+	 * Iterates over all the mappings in the state. The iterator cannot
+	 * remove elements.
+	 *
+	 * @return A read-only iterator over all the mappings in the state
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	@Override
+	public Iterator<Map.Entry<K, V>> iterator() {
+		return Collections.unmodifiableSet(state.entrySet()).iterator();
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <K, V> ImmutableMapState<K, V> createState(
+			final MapStateDescriptor<K, V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final Map<K, V> state = KvStateSerializer.deserializeMap(
+				serializedState,
+				stateDescriptor.getKeySerializer(),
+				stateDescriptor.getValueSerializer());
+		return new ImmutableMapState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
new file mode 100644
index 0000000..46b477f
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableReducingState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ReducingState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ReducingStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableReducingState<V> extends ImmutableState implements ReducingState<V> {
+
+	private final V value;
+
+	private ImmutableReducingState(V value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public V get() {
+		return value;
+	}
+
+	@Override
+	public void add(V newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableReducingState<V> createState(
+			final ReducingStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final V state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableReducingState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
new file mode 100644
index 0000000..863f07b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+/**
+ * A base class for the <b>read-only</b> types of state returned
+ * as results from the Queryable State Client.
+ */
+abstract class ImmutableState {
+
+	protected static final UnsupportedOperationException MODIFICATION_ATTEMPT_ERROR =
+			new UnsupportedOperationException("State is read-only. No modifications allowed.");
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
new file mode 100644
index 0000000..6ce2787
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableStateBinder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.StateBinder;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link StateBinder} used to deserialize the results returned by the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient}.
+ *
+ * <p>The result is an immutable {@link org.apache.flink.api.common.state.State State}
+ * object containing the requested result.
+ */
+public class ImmutableStateBinder implements StateBinder {
+
+	private final byte[] serializedState;
+
+	public ImmutableStateBinder(final byte[] content) {
+		serializedState = Preconditions.checkNotNull(content);
+	}
+
+	@Override
+	public <T> ValueState<T> createValueState(ValueStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableValueState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T> ListState<T> createListState(ListStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableListState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T> ReducingState<T> createReducingState(ReducingStateDescriptor<T> stateDesc) throws Exception {
+		return ImmutableReducingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <IN, ACC, OUT> AggregatingState<IN, OUT> createAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateDesc) throws Exception {
+		return ImmutableAggregatingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <T, ACC> FoldingState<T, ACC> createFoldingState(FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		return ImmutableFoldingState.createState(stateDesc, serializedState);
+	}
+
+	@Override
+	public <MK, MV> MapState<MK, MV> createMapState(MapStateDescriptor<MK, MV> stateDesc) throws Exception {
+		return ImmutableMapState.createState(stateDesc, serializedState);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
new file mode 100644
index 0000000..f3ddd2b
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/ImmutableValueState.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A read-only {@link ValueState} that does not allow for modifications.
+ *
+ * <p>This is the result returned when querying Flink's keyed state using the
+ * {@link org.apache.flink.queryablestate.client.QueryableStateClient Queryable State Client} and
+ * providing an {@link ValueStateDescriptor}.
+ */
+@PublicEvolving
+public final class ImmutableValueState<V> extends ImmutableState implements ValueState<V> {
+
+	private final V value;
+
+	private ImmutableValueState(V value) {
+		this.value = Preconditions.checkNotNull(value);
+	}
+
+	@Override
+	public V value() {
+		return value;
+	}
+
+	@Override
+	public void update(V newValue) {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	@Override
+	public void clear() {
+		throw MODIFICATION_ATTEMPT_ERROR;
+	}
+
+	public static <V> ImmutableValueState<V> createState(
+			final ValueStateDescriptor<V> stateDescriptor,
+			final byte[] serializedState) throws IOException {
+
+		final V state = KvStateSerializer.deserializeValue(
+				serializedState,
+				stateDescriptor.getSerializer());
+		return new ImmutableValueState<>(state);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
new file mode 100644
index 0000000..878df85
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/state/serialization/DataInputDeserializer.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.state.serialization;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemoryUtils;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.UTFDataFormatException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+/**
+ * A simple and efficient deserializer for the {@link java.io.DataInput} interface.
+ *
+ * <p><b>THIS WAS COPIED FROM RUNTIME SO THAT WE AVOID THE DEPENDENCY.</b>
+ */
+public class DataInputDeserializer implements DataInputView, java.io.Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// ------------------------------------------------------------------------
+
+	private byte[] buffer;
+
+	private int end;
+
+	private int position;
+
+	// ------------------------------------------------------------------------
+
+	public DataInputDeserializer() {}
+
+	public DataInputDeserializer(byte[] buffer) {
+		setBuffer(buffer, 0, buffer.length);
+	}
+
+	public DataInputDeserializer(byte[] buffer, int start, int len) {
+		setBuffer(buffer, start, len);
+	}
+
+	public DataInputDeserializer(ByteBuffer buffer) {
+		setBuffer(buffer);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Changing buffers
+	// ------------------------------------------------------------------------
+
+	public void setBuffer(ByteBuffer buffer) {
+		if (buffer.hasArray()) {
+			this.buffer = buffer.array();
+			this.position = buffer.arrayOffset() + buffer.position();
+			this.end = this.position + buffer.remaining();
+		} else if (buffer.isDirect()) {
+			this.buffer = new byte[buffer.remaining()];
+			this.position = 0;
+			this.end = this.buffer.length;
+
+			buffer.get(this.buffer);
+		} else {
+			throw new IllegalArgumentException("The given buffer is neither an array-backed heap ByteBuffer, nor a direct ByteBuffer.");
+		}
+	}
+
+	public void setBuffer(byte[] buffer, int start, int len) {
+		if (buffer == null) {
+			throw new NullPointerException();
+		}
+
+		if (start < 0 || len < 0 || start + len > buffer.length) {
+			throw new IllegalArgumentException();
+		}
+
+		this.buffer = buffer;
+		this.position = start;
+		this.end = start + len;
+	}
+
+	public void releaseArrays() {
+		this.buffer = null;
+	}
+
+	// ----------------------------------------------------------------------------------------
+	//                               Data Input
+	// ----------------------------------------------------------------------------------------
+
+	public int available() {
+		if (position < end) {
+			return end - position;
+		} else {
+			return 0;
+		}
+	}
+
+	@Override
+	public boolean readBoolean() throws IOException {
+		if (this.position < this.end) {
+			return this.buffer[this.position++] != 0;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public byte readByte() throws IOException {
+		if (this.position < this.end) {
+			return this.buffer[this.position++];
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public char readChar() throws IOException {
+		if (this.position < this.end - 1) {
+			return (char) (((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff));
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public double readDouble() throws IOException {
+		return Double.longBitsToDouble(readLong());
+	}
+
+	@Override
+	public float readFloat() throws IOException {
+		return Float.intBitsToFloat(readInt());
+	}
+
+	@Override
+	public void readFully(byte[] b) throws IOException {
+		readFully(b, 0, b.length);
+	}
+
+	@Override
+	public void readFully(byte[] b, int off, int len) throws IOException {
+		if (len >= 0) {
+			if (off <= b.length - len) {
+				if (this.position <= this.end - len) {
+					System.arraycopy(this.buffer, position, b, off, len);
+					position += len;
+				} else {
+					throw new EOFException();
+				}
+			} else {
+				throw new ArrayIndexOutOfBoundsException();
+			}
+		} else if (len < 0) {
+			throw new IllegalArgumentException("Length may not be negative.");
+		}
+	}
+
+	@Override
+	public int readInt() throws IOException {
+		if (this.position >= 0 && this.position < this.end - 3) {
+			@SuppressWarnings("restriction")
+			int value = UNSAFE.getInt(this.buffer, BASE_OFFSET + this.position);
+			if (LITTLE_ENDIAN) {
+				value = Integer.reverseBytes(value);
+			}
+
+			this.position += 4;
+			return value;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public String readLine() throws IOException {
+		if (this.position < this.end) {
+			// read until a newline is found
+			StringBuilder bld = new StringBuilder();
+			char curr = (char) readUnsignedByte();
+			while (position < this.end && curr != '\n') {
+				bld.append(curr);
+				curr = (char) readUnsignedByte();
+			}
+			// trim a trailing carriage return
+			int len = bld.length();
+			if (len > 0 && bld.charAt(len - 1) == '\r') {
+				bld.setLength(len - 1);
+			}
+			String s = bld.toString();
+			bld.setLength(0);
+			return s;
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public long readLong() throws IOException {
+		if (position >= 0 && position < this.end - 7) {
+			@SuppressWarnings("restriction")
+			long value = UNSAFE.getLong(this.buffer, BASE_OFFSET + this.position);
+			if (LITTLE_ENDIAN) {
+				value = Long.reverseBytes(value);
+			}
+			this.position += 8;
+			return value;
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public short readShort() throws IOException {
+		if (position >= 0 && position < this.end - 1) {
+			return (short) ((((this.buffer[position++]) & 0xff) << 8) | ((this.buffer[position++]) & 0xff));
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public String readUTF() throws IOException {
+		int utflen = readUnsignedShort();
+		byte[] bytearr = new byte[utflen];
+		char[] chararr = new char[utflen];
+
+		int c, char2, char3;
+		int count = 0;
+		int chararrCount = 0;
+
+		readFully(bytearr, 0, utflen);
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			if (c > 127) {
+				break;
+			}
+			count++;
+			chararr[chararrCount++] = (char) c;
+		}
+
+		while (count < utflen) {
+			c = (int) bytearr[count] & 0xff;
+			switch (c >> 4) {
+			case 0:
+			case 1:
+			case 2:
+			case 3:
+			case 4:
+			case 5:
+			case 6:
+			case 7:
+				/* 0xxxxxxx */
+				count++;
+				chararr[chararrCount++] = (char) c;
+				break;
+			case 12:
+			case 13:
+				/* 110x xxxx 10xx xxxx */
+				count += 2;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 1];
+				if ((char2 & 0xC0) != 0x80) {
+					throw new UTFDataFormatException("malformed input around byte " + count);
+				}
+				chararr[chararrCount++] = (char) (((c & 0x1F) << 6) | (char2 & 0x3F));
+				break;
+			case 14:
+				/* 1110 xxxx 10xx xxxx 10xx xxxx */
+				count += 3;
+				if (count > utflen) {
+					throw new UTFDataFormatException("malformed input: partial character at end");
+				}
+				char2 = (int) bytearr[count - 2];
+				char3 = (int) bytearr[count - 1];
+				if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) {
+					throw new UTFDataFormatException("malformed input around byte " + (count - 1));
+				}
+				chararr[chararrCount++] = (char) (((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | (char3 & 0x3F));
+				break;
+			default:
+				/* 10xx xxxx, 1111 xxxx */
+				throw new UTFDataFormatException("malformed input around byte " + count);
+			}
+		}
+		// The number of chars produced may be less than utflen
+		return new String(chararr, 0, chararrCount);
+	}
+
+	@Override
+	public int readUnsignedByte() throws IOException {
+		if (this.position < this.end) {
+			return (this.buffer[this.position++] & 0xff);
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public int readUnsignedShort() throws IOException {
+		if (this.position < this.end - 1) {
+			return ((this.buffer[this.position++] & 0xff) << 8) | (this.buffer[this.position++] & 0xff);
+		} else {
+			throw new EOFException();
+		}
+	}
+
+	@Override
+	public int skipBytes(int n) throws IOException {
+		if (this.position <= this.end - n) {
+			this.position += n;
+			return n;
+		} else {
+			n = this.end - this.position;
+			this.position = this.end;
+			return n;
+		}
+	}
+
+	@Override
+	public void skipBytesToRead(int numBytes) throws IOException {
+		int skippedBytes = skipBytes(numBytes);
+
+		if (skippedBytes < numBytes){
+			throw new EOFException("Could not skip " + numBytes + " bytes.");
+		}
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		if (b == null){
+			throw new NullPointerException("Byte array b cannot be null.");
+		}
+
+		if (off < 0){
+			throw new IndexOutOfBoundsException("Offset cannot be negative.");
+		}
+
+		if (len < 0){
+			throw new IndexOutOfBoundsException("Length cannot be negative.");
+		}
+
+		if (b.length - off < len){
+			throw new IndexOutOfBoundsException("Byte array does not provide enough space to store requested data" +
+					".");
+		}
+
+		if (this.position >= this.end) {
+			return -1;
+		} else {
+			int toRead = Math.min(this.end - this.position, len);
+			System.arraycopy(this.buffer, this.position, b, off, toRead);
+			this.position += toRead;
+
+			return toRead;
+		}
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return read(b, 0, b.length);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	@SuppressWarnings("restriction")
+	private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
+
+	@SuppressWarnings("restriction")
+	private static final long BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
+
+	private static final boolean LITTLE_ENDIAN = (MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
+}