You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:06 UTC

[06/14] flink git commit: [FLINK-7769][QS] Move queryable state outside the runtime.

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..15a5ff6
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
+
+	private static final int NUM_JMS = 2;
+	private static final int NUM_TMS = 4;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	private static TestingServer zkServer;
+	private static TemporaryFolder temporaryFolder;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			zkServer = new TestingServer();
+			temporaryFolder = new TemporaryFolder();
+			temporaryFolder.create();
+
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+			config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
+			config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+			config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+			cluster = new TestingCluster(config, false);
+			cluster.start();
+
+			testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+			// verify that we are in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		if (cluster != null) {
+			cluster.stop();
+			cluster.awaitTermination();
+		}
+
+		testActorSystem.shutdown();
+		testActorSystem.awaitTermination();
+
+		try {
+			zkServer.stop();
+			zkServer.close();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		temporaryFolder.delete();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..a2d3ad0
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..fda1171
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..907e8a3
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization using
+ * the KvStateSerializer with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
+	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+	 * the tests.
+	 *
+	 * @param <K> key type
+	 */
+	static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+		RocksDBKeyedStateBackend2(
+				final String operatorIdentifier,
+				final ClassLoader userCodeClassLoader,
+				final File instanceBasePath,
+				final DBOptions dbOptions,
+				final ColumnFamilyOptions columnFamilyOptions,
+				final TaskKvStateRegistry kvStateRegistry,
+				final TypeSerializer<K> keySerializer,
+				final int numberOfKeyGroups,
+				final KeyGroupRange keyGroupRange,
+				final ExecutionConfig executionConfig) throws Exception {
+
+			super(operatorIdentifier, userCodeClassLoader,
+				instanceBasePath,
+				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+				numberOfKeyGroups, keyGroupRange, executionConfig, false);
+		}
+
+		@Override
+		public <N, T> InternalListState<N, T> createListState(
+			final TypeSerializer<N> namespaceSerializer,
+			final ListStateDescriptor<T> stateDesc) throws Exception {
+
+			return super.createListState(namespaceSerializer, stateDesc);
+		}
+	}
+
+	/**
+	 * Tests list serialization and deserialization match.
+	 *
+	 * @see KvStateRequestSerializerTest#testListSerialization()
+	 * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+	 * test
+	 */
+	@Test
+	public void testListSerialization() throws Exception {
+		final long key = 0L;
+
+		// objects for RocksDB state list serialisation
+		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+		dbOptions.setCreateIfMissing(true);
+		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend2<>(
+				"no-op",
+				ClassLoader.getSystemClassLoader(),
+				temporaryFolder.getRoot(),
+				dbOptions,
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
+			);
+		longHeapKeyedStateBackend.restore(null);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+			.createListState(VoidNamespaceSerializer.INSTANCE,
+				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
+		KvStateRequestSerializerTest.testListSerialization(key, listState);
+	}
+
+	/**
+	 * Tests map serialization and deserialization match.
+	 *
+	 * @see KvStateRequestSerializerTest#testMapSerialization()
+	 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
+	 * test
+	 */
+	@Test
+	public void testMapSerialization() throws Exception {
+		final long key = 0L;
+
+		// objects for RocksDB state list serialisation
+		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+		dbOptions.setCreateIfMissing(true);
+		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend<>(
+				"no-op",
+				ClassLoader.getSystemClassLoader(),
+				temporaryFolder.getRoot(),
+				dbOptions,
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig(),
+				false);
+		longHeapKeyedStateBackend.restore(null);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
+				longHeapKeyedStateBackend.getPartitionedState(
+						VoidNamespace.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE,
+						new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		KvStateRequestSerializerTest.testMapSerialization(key, mapState);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..c52acc8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
+
+	private static final int NUM_TMS = 2;
+	private static final int NUM_SLOTS_PER_TM = 4;
+
+	@BeforeClass
+	public static void setup() {
+		try {
+			Configuration config = new Configuration();
+			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+			config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+			config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+			config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+
+			cluster = new TestingCluster(config, false);
+			cluster.start(true);
+
+			testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+			// verify that we are not in HA mode
+			Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
+
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@AfterClass
+	public static void tearDown() {
+		try {
+			cluster.shutdown();
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..caa315a
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..10e9b57
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	@Override
+	protected AbstractStateBackend createStateBackend() throws Exception {
+		return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
new file mode 100644
index 0000000..d9a41a1
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownJobManager;
+import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link AkkaKvStateLocationLookupService}.
+ */
+public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
+
+	/** The default timeout. */
+	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+	/** Test actor system shared between the tests. */
+	private static ActorSystem testActorSystem;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (testActorSystem != null) {
+			testActorSystem.shutdown();
+		}
+	}
+
+	/**
+	 * Tests responses if no leader notification has been reported or leadership
+	 * has been lost (leaderAddress = <code>null</code>).
+	 */
+	@Test
+	public void testNoJobManagerRegistered() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		//
+		// No leader registered initially => fail with UnknownJobManager
+		//
+		try {
+			JobID jobId = new JobID();
+			String name = "coffee";
+
+			Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
+
+			Await.result(locationFuture, TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (UnknownJobManager ignored) {
+			// Expected
+		}
+
+		assertEquals("Received unexpected lookup", 0, received.size());
+
+		//
+		// Leader registration => communicate with new leader
+		//
+		UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
+		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
+
+		ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
+
+		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		// Notify the service about a leader
+		leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
+
+		JobID jobId = new JobID();
+		String name = "tea";
+
+		// Verify that the leader response is handled
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
+		assertEquals(expected, location);
+
+		// Verify that the correct message was sent to the leader
+		assertEquals(1, received.size());
+
+		verifyLookupMsg(received.poll(), jobId, name);
+
+		//
+		// Leader loss => fail with UnknownJobManager
+		//
+		leaderRetrievalService.notifyListener(null, null);
+
+		try {
+			Future<KvStateLocation> locationFuture = lookupService
+					.getKvStateLookupInfo(new JobID(), "coffee");
+
+			Await.result(locationFuture, TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (UnknownJobManager ignored) {
+			// Expected
+		}
+
+		// No new messages received
+		assertEquals(0, received.size());
+	}
+
+	/**
+	 * Tests that messages are properly decorated with the leader session ID.
+	 */
+	@Test
+	public void testLeaderSessionIdChange() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		// Create test actors with random leader session IDs
+		KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
+		UUID leaderSessionId1 = UUID.randomUUID();
+		ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
+		String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
+
+		KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
+		UUID leaderSessionId2 = UUID.randomUUID();
+		ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
+		String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
+
+		JobID jobId = new JobID();
+
+		//
+		// Notify about first leader
+		//
+		leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
+
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
+		assertEquals(expected1, location);
+
+		assertEquals(1, received.size());
+		verifyLookupMsg(received.poll(), jobId, "rock");
+
+		//
+		// Notify about second leader
+		//
+		leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
+
+		location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
+		assertEquals(expected2, location);
+
+		assertEquals(1, received.size());
+		verifyLookupMsg(received.poll(), jobId, "roll");
+	}
+
+	/**
+	 * Tests that lookups are retried when no leader notification is available.
+	 */
+	@Test
+	public void testRetryOnUnknownJobManager() throws Exception {
+		final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy =
+				new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
+					@Override
+					public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
+						return retryStrategies.poll();
+					}
+				};
+
+		final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			null,
+			null);
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				retryStrategy);
+
+		lookupService.start();
+
+		//
+		// Test call to retry
+		//
+		final AtomicBoolean hasRetried = new AtomicBoolean();
+		retryStrategies.add(
+				new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+					@Override
+					public FiniteDuration getRetryDelay() {
+						return FiniteDuration.Zero();
+					}
+
+					@Override
+					public boolean tryRetry() {
+						if (hasRetried.compareAndSet(false, true)) {
+							return true;
+						}
+						return false;
+					}
+				});
+
+		Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
+
+		Await.ready(locationFuture, TIMEOUT);
+		assertTrue("Did not retry ", hasRetried.get());
+
+		//
+		// Test leader notification after retry
+		//
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
+		ActorRef testActor = LookupResponseActor.create(received, null, expected);
+		final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+			@Override
+			public FiniteDuration getRetryDelay() {
+				return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+			}
+
+			@Override
+			public boolean tryRetry() {
+				leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
+				return true;
+			}
+		});
+
+		KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
+		assertEquals(expected, location);
+	}
+
+	@Test
+	public void testUnexpectedResponseType() throws Exception {
+		TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+			"localhost",
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
+		Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+		AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+				leaderRetrievalService,
+				testActorSystem,
+				TIMEOUT,
+				new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+		lookupService.start();
+
+		// Create test actors with random leader session IDs
+		String expected = "unexpected-response-type";
+		ActorRef testActor = LookupResponseActor.create(received, null, expected);
+		String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+		leaderRetrievalService.notifyListener(testActorAddress, null);
+
+		try {
+			Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
+			fail("Did not throw expected Exception");
+		} catch (Throwable ignored) {
+			// Expected
+		}
+	}
+
+	private static final class LookupResponseActor extends FlinkUntypedActor {
+
+		/** Received lookup messages. */
+		private final Queue<LookupKvStateLocation> receivedLookups;
+
+		/** Responses on KvStateMessage.LookupKvStateLocation messages. */
+		private final Queue<Object> lookupResponses;
+
+		/** The leader session ID. */
+		private UUID leaderSessionId;
+
+		public LookupResponseActor(
+				Queue<LookupKvStateLocation> receivedLookups,
+				UUID leaderSessionId, Object... lookupResponses) {
+
+			this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
+			this.leaderSessionId = leaderSessionId;
+			this.lookupResponses = new ArrayDeque<>();
+
+			if (lookupResponses != null) {
+				for (Object resp : lookupResponses) {
+					this.lookupResponses.add(resp);
+				}
+			}
+		}
+
+		@Override
+		public void handleMessage(Object message) throws Exception {
+			if (message instanceof LookupKvStateLocation) {
+				// Add to received lookups queue
+				receivedLookups.add((LookupKvStateLocation) message);
+
+				Object msg = lookupResponses.poll();
+				if (msg != null) {
+					if (msg instanceof Throwable) {
+						sender().tell(new Status.Failure((Throwable) msg), self());
+					} else {
+						sender().tell(new Status.Success(msg), self());
+					}
+				}
+			} else if (message instanceof UUID) {
+				this.leaderSessionId = (UUID) message;
+			} else {
+				LOG.debug("Received unhandled message: {}", message);
+			}
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return leaderSessionId;
+		}
+
+		private static ActorRef create(
+				Queue<LookupKvStateLocation> receivedLookups,
+				UUID leaderSessionId,
+				Object... lookupResponses) {
+
+			return testActorSystem.actorOf(Props.create(
+					LookupResponseActor.class,
+					receivedLookups,
+					leaderSessionId,
+					lookupResponses));
+		}
+	}
+
+	private static void verifyLookupMsg(
+			LookupKvStateLocation lookUpMsg,
+			JobID expectedJobId,
+			String expectedName) {
+
+		assertNotNull(lookUpMsg);
+		assertEquals(expectedJobId, lookUpMsg.getJobId());
+		assertEquals(expectedName, lookUpMsg.getRegistrationName());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
new file mode 100644
index 0000000..0b97bda
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.client.KvStateClientHandler;
+import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+
+import org.junit.Test;
+
+import java.nio.channels.ClosedChannelException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link KvStateClientHandler}.
+ */
+public class KvStateClientHandlerTest {
+
+	/**
+	 * Tests that on reads the expected callback methods are called and read
+	 * buffers are recycled.
+	 */
+	@Test
+	public void testReadCallbacksAndBufferRecycling() throws Exception {
+		KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class);
+
+		EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback));
+
+		//
+		// Request success
+		//
+		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+				channel.alloc(),
+				1222112277,
+				new byte[0]);
+		buf.skipBytes(4); // skip frame length
+
+		// Verify callback
+		channel.writeInbound(buf);
+		verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class));
+		assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+		//
+		// Request failure
+		//
+		buf = MessageSerializer.serializeKvStateRequestFailure(
+				channel.alloc(),
+				1222112278,
+				new RuntimeException("Expected test Exception"));
+		buf.skipBytes(4); // skip frame length
+
+		// Verify callback
+		channel.writeInbound(buf);
+		verify(callback, times(1)).onRequestFailure(eq(1222112278L), any(RuntimeException.class));
+		assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+		//
+		// Server failure
+		//
+		buf = MessageSerializer.serializeServerFailure(
+				channel.alloc(),
+				new RuntimeException("Expected test Exception"));
+		buf.skipBytes(4); // skip frame length
+
+		// Verify callback
+		channel.writeInbound(buf);
+		verify(callback, times(1)).onFailure(any(RuntimeException.class));
+
+		//
+		// Unexpected messages
+		//
+		buf = channel.alloc().buffer(4).writeInt(1223823);
+
+		// Verify callback
+		channel.writeInbound(buf);
+		verify(callback, times(2)).onFailure(any(IllegalStateException.class));
+		assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+		//
+		// Exception caught
+		//
+		channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
+		verify(callback, times(3)).onFailure(any(RuntimeException.class));
+
+		//
+		// Channel inactive
+		//
+		channel.pipeline().fireChannelInactive();
+		verify(callback, times(4)).onFailure(any(ClosedChannelException.class));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
new file mode 100644
index 0000000..a2850b3
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KvStateClient}.
+ */
+public class KvStateClientTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
+
+	// Thread pool for client bootstrap (shared between tests)
+	private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+	private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (NIO_GROUP != null) {
+			NIO_GROUP.shutdownGracefully();
+		}
+	}
+
+	/**
+	 * Tests simple queries, of which half succeed and half fail.
+	 */
+	@Test
+	public void testSimpleRequests() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateClient client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new KvStateClient(1, stats);
+
+			// Random result
+			final byte[] expected = new byte[1024];
+			ThreadLocalRandom.current().nextBytes(expected);
+
+			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.add((ByteBuf) msg);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			List<Future<byte[]>> futures = new ArrayList<>();
+
+			int numQueries = 1024;
+
+			for (int i = 0; i < numQueries; i++) {
+				futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+			}
+
+			// Respond to messages
+			Exception testException = new RuntimeException("Expected test Exception");
+
+			for (int i = 0; i < numQueries; i++) {
+				ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				assertNotNull("Receive timed out", buf);
+
+				Channel ch = channel.get();
+				assertNotNull("Channel not active", ch);
+
+				assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+				KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+
+				buf.release();
+
+				if (i % 2 == 0) {
+					ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
+							serverChannel.alloc(),
+							request.getRequestId(),
+							expected);
+
+					ch.writeAndFlush(response);
+				} else {
+					ByteBuf response = MessageSerializer.serializeKvStateRequestFailure(
+							serverChannel.alloc(),
+							request.getRequestId(),
+							testException);
+
+					ch.writeAndFlush(response);
+				}
+			}
+
+			for (int i = 0; i < numQueries; i++) {
+				if (i % 2 == 0) {
+					byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft());
+					assertArrayEquals(expected, serializedResult);
+				} else {
+					try {
+						Await.result(futures.get(i), deadline.timeLeft());
+						fail("Did not throw expected Exception");
+					} catch (RuntimeException ignored) {
+						// Expected
+					}
+				}
+			}
+
+			assertEquals(numQueries, stats.getNumRequests());
+			int expectedRequests = numQueries / 2;
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
+					stats.getNumFailed() != expectedRequests)) {
+				Thread.sleep(100);
+			}
+
+			assertEquals(expectedRequests, stats.getNumSuccessful());
+			assertEquals(expectedRequests, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a request to an unavailable host is failed with ConnectException.
+	 */
+	@Test
+	public void testRequestUnavailableHost() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+		KvStateClient client = null;
+
+		try {
+			client = new KvStateClient(1, stats);
+
+			int availablePort = NetUtils.getAvailablePort();
+
+			KvStateServerAddress serverAddress = new KvStateServerAddress(
+					InetAddress.getLocalHost(),
+					availablePort);
+
+			Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+			try {
+				Await.result(future, deadline.timeLeft());
+				fail("Did not throw expected ConnectException");
+			} catch (ConnectException ignored) {
+				// Expected
+			}
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			assertEquals("Channel leak", 0, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Multiple threads concurrently fire queries.
+	 */
+	@Test
+	public void testConcurrentQueries() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		ExecutorService executor = null;
+		KvStateClient client = null;
+		Channel serverChannel = null;
+
+		final byte[] serializedResult = new byte[1024];
+		ThreadLocalRandom.current().nextBytes(serializedResult);
+
+		try {
+			int numQueryTasks = 4;
+			final int numQueriesPerTask = 1024;
+
+			executor = Executors.newFixedThreadPool(numQueryTasks);
+
+			client = new KvStateClient(1, stats);
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					ByteBuf buf = (ByteBuf) msg;
+					assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+					KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+
+					buf.release();
+
+					ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
+							ctx.alloc(),
+							request.getRequestId(),
+							serializedResult);
+
+					ctx.channel().writeAndFlush(response);
+				}
+			});
+
+			final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			final KvStateClient finalClient = client;
+			Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() {
+				@Override
+				public List<Future<byte[]>> call() throws Exception {
+					List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask);
+
+					for (int i = 0; i < numQueriesPerTask; i++) {
+						results.add(finalClient.getKvState(
+								serverAddress,
+								new KvStateID(),
+								new byte[0]));
+					}
+
+					return results;
+				}
+			};
+
+			// Submit query tasks
+			List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>();
+			for (int i = 0; i < numQueryTasks; i++) {
+				futures.add(executor.submit(queryTask));
+			}
+
+			// Verify results
+			for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) {
+				List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+				for (Future<byte[]> result : results) {
+					byte[] actual = Await.result(result, deadline.timeLeft());
+					assertArrayEquals(serializedResult, actual);
+				}
+			}
+
+			int totalQueries = numQueryTasks * numQueriesPerTask;
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
+				Thread.sleep(100);
+			}
+
+			assertEquals(totalQueries, stats.getNumRequests());
+			assertEquals(totalQueries, stats.getNumSuccessful());
+		} finally {
+			if (executor != null) {
+				executor.shutdown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			if (client != null) {
+				client.shutDown();
+			}
+
+			assertEquals("Channel leak", 0, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a server failure closes the connection and removes it from
+	 * the established connections.
+	 */
+	@Test
+	public void testFailureClosesChannel() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateClient client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new KvStateClient(1, stats);
+
+			final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.add((ByteBuf) msg);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			// Requests
+			List<Future<byte[]>> futures = new ArrayList<>();
+			futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+			futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+
+			ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertNotNull("Receive timed out", buf);
+			buf.release();
+
+			buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+			assertNotNull("Receive timed out", buf);
+			buf.release();
+
+			assertEquals(1, stats.getNumConnections());
+
+			Channel ch = channel.get();
+			assertNotNull("Channel not active", ch);
+
+			// Respond with failure
+			ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+					serverChannel.alloc(),
+					new RuntimeException("Expected test server failure")));
+
+			try {
+				Await.result(futures.remove(0), deadline.timeLeft());
+				fail("Did not throw expected server failure");
+			} catch (RuntimeException ignored) {
+				// Expected
+			}
+
+			try {
+				Await.result(futures.remove(0), deadline.timeLeft());
+				fail("Did not throw expected server failure");
+			} catch (RuntimeException ignored) {
+				// Expected
+			}
+
+			assertEquals(0, stats.getNumConnections());
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
+					stats.getNumFailed() != 2)) {
+				Thread.sleep(100);
+			}
+
+			assertEquals(2, stats.getNumRequests());
+			assertEquals(0, stats.getNumSuccessful());
+			assertEquals(2, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests that a server channel close, closes the connection and removes it
+	 * from the established connections.
+	 */
+	@Test
+	public void testServerClosesChannel() throws Exception {
+		Deadline deadline = TEST_TIMEOUT.fromNow();
+		AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+		KvStateClient client = null;
+		Channel serverChannel = null;
+
+		try {
+			client = new KvStateClient(1, stats);
+
+			final AtomicBoolean received = new AtomicBoolean();
+			final AtomicReference<Channel> channel = new AtomicReference<>();
+
+			serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+				@Override
+				public void channelActive(ChannelHandlerContext ctx) throws Exception {
+					channel.set(ctx.channel());
+				}
+
+				@Override
+				public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+					received.set(true);
+				}
+			});
+
+			KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+			// Requests
+			Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+			while (!received.get() && deadline.hasTimeLeft()) {
+				Thread.sleep(50);
+			}
+			assertTrue("Receive timed out", received.get());
+
+			assertEquals(1, stats.getNumConnections());
+
+			channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+			try {
+				Await.result(future, deadline.timeLeft());
+				fail("Did not throw expected server failure");
+			} catch (ClosedChannelException ignored) {
+				// Expected
+			}
+
+			assertEquals(0, stats.getNumConnections());
+
+			// Counts can take some time to propagate
+			while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
+					stats.getNumFailed() != 1)) {
+				Thread.sleep(100);
+			}
+
+			assertEquals(1, stats.getNumRequests());
+			assertEquals(0, stats.getNumSuccessful());
+			assertEquals(1, stats.getNumFailed());
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			if (serverChannel != null) {
+				serverChannel.close();
+			}
+
+			assertEquals("Channel leak", 0, stats.getNumConnections());
+		}
+	}
+
+	/**
+	 * Tests multiple clients querying multiple servers until 100k queries have
+	 * been processed. At this point, the client is shut down and its verified
+	 * that all ongoing requests are failed.
+	 */
+	@Test
+	public void testClientServerIntegration() throws Exception {
+		// Config
+		final int numServers = 2;
+		final int numServerEventLoopThreads = 2;
+		final int numServerQueryThreads = 2;
+
+		final int numClientEventLoopThreads = 4;
+		final int numClientsTasks = 8;
+
+		final int batchSize = 16;
+
+		final int numKeyGroups = 1;
+
+		AbstractStateBackend abstractBackend = new MemoryStateBackend();
+		KvStateRegistry dummyRegistry = new KvStateRegistry();
+		DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+		dummyEnv.setKvStateRegistry(dummyRegistry);
+
+		AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+				dummyEnv,
+				new JobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				numKeyGroups,
+				new KeyGroupRange(0, 0),
+				dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+		final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+		AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+		KvStateClient client = null;
+		ExecutorService clientTaskExecutor = null;
+		final KvStateServer[] server = new KvStateServer[numServers];
+
+		try {
+			client = new KvStateClient(numClientEventLoopThreads, clientStats);
+			clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
+
+			// Create state
+			ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+			desc.setQueryable("any");
+
+			// Create servers
+			KvStateRegistry[] registry = new KvStateRegistry[numServers];
+			AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+			final KvStateID[] ids = new KvStateID[numServers];
+
+			for (int i = 0; i < numServers; i++) {
+				registry[i] = new KvStateRegistry();
+				serverStats[i] = new AtomicKvStateRequestStats();
+				server[i] = new KvStateServerImpl(
+						InetAddress.getLocalHost(),
+						0,
+						numServerEventLoopThreads,
+						numServerQueryThreads,
+						registry[i],
+						serverStats[i]);
+
+				server[i].start();
+
+				backend.setCurrentKey(1010 + i);
+
+				// Value per server
+				ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE,
+						desc);
+
+				state.update(201 + i);
+
+				// we know it must be a KvStat but this is not exposed to the user via State
+				InternalKvState<?> kvState = (InternalKvState<?>) state;
+
+				// Register KvState (one state instance for all server)
+				ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+			}
+
+			final KvStateClient finalClient = client;
+			Callable<Void> queryTask = new Callable<Void>() {
+				@Override
+				public Void call() throws Exception {
+					while (true) {
+						if (Thread.interrupted()) {
+							throw new InterruptedException();
+						}
+
+						// Random server permutation
+						List<Integer> random = new ArrayList<>();
+						for (int j = 0; j < batchSize; j++) {
+							random.add(j);
+						}
+						Collections.shuffle(random);
+
+						// Dispatch queries
+						List<Future<byte[]>> futures = new ArrayList<>(batchSize);
+
+						for (int j = 0; j < batchSize; j++) {
+							int targetServer = random.get(j) % numServers;
+
+							byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+									1010 + targetServer,
+									IntSerializer.INSTANCE,
+									VoidNamespace.INSTANCE,
+									VoidNamespaceSerializer.INSTANCE);
+
+							futures.add(finalClient.getKvState(
+									server[targetServer].getAddress(),
+									ids[targetServer],
+									serializedKeyAndNamespace));
+						}
+
+						// Verify results
+						for (int j = 0; j < batchSize; j++) {
+							int targetServer = random.get(j) % numServers;
+
+							Future<byte[]> future = futures.get(j);
+							byte[] buf = Await.result(future, timeout);
+							int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+							assertEquals(201 + targetServer, value);
+						}
+					}
+				}
+			};
+
+			// Submit tasks
+			List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>();
+			for (int i = 0; i < numClientsTasks; i++) {
+				taskFutures.add(clientTaskExecutor.submit(queryTask));
+			}
+
+			long numRequests;
+			while ((numRequests = clientStats.getNumRequests()) < 100_000) {
+				Thread.sleep(100);
+				LOG.info("Number of requests {}/100_000", numRequests);
+			}
+
+			// Shut down
+			client.shutDown();
+
+			for (java.util.concurrent.Future<Void> future : taskFutures) {
+				try {
+					future.get();
+					fail("Did not throw expected Exception after shut down");
+				} catch (ExecutionException t) {
+					if (t.getCause() instanceof ClosedChannelException ||
+							t.getCause() instanceof IllegalStateException) {
+						// Expected
+					} else {
+						t.printStackTrace();
+						fail("Failed with unexpected Exception type: " + t.getClass().getName());
+					}
+				}
+			}
+
+			assertEquals("Connection leak (client)", 0, clientStats.getNumConnections());
+			for (int i = 0; i < numServers; i++) {
+				boolean success = false;
+				int numRetries = 0;
+				while (!success) {
+					try {
+						assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections());
+						success = true;
+					} catch (Throwable t) {
+						if (numRetries < 10) {
+							LOG.info("Retrying connection leak check (server)");
+							Thread.sleep((numRetries + 1) * 50);
+							numRetries++;
+						} else {
+							throw t;
+						}
+					}
+				}
+			}
+		} finally {
+			if (client != null) {
+				client.shutDown();
+			}
+
+			for (int i = 0; i < numServers; i++) {
+				if (server[i] != null) {
+					server[i].shutDown();
+				}
+			}
+
+			if (clientTaskExecutor != null) {
+				clientTaskExecutor.shutdown();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
+		ServerBootstrap bootstrap = new ServerBootstrap()
+				// Bind address and port
+				.localAddress(InetAddress.getLocalHost(), 0)
+				// NIO server channels
+				.group(NIO_GROUP)
+				.channel(NioServerSocketChannel.class)
+				// See initializer for pipeline details
+				.childHandler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					protected void initChannel(SocketChannel ch) throws Exception {
+						ch.pipeline()
+								.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+								.addLast(handlers);
+					}
+				});
+
+		return bootstrap.bind().sync().channel();
+	}
+
+	private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
+		InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
+
+		return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
new file mode 100644
index 0000000..f28ca68
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KvStateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class KvStateRequestSerializerTest {
+
+	private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+	@Parameterized.Parameters
+	public static Collection<Boolean> parameters() {
+		return Arrays.asList(false, true);
+	}
+
+	@Parameterized.Parameter
+	public boolean async;
+
+	/**
+	 * Tests KvState request serialization.
+	 */
+	@Test
+	public void testKvStateRequestSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 1337L;
+		KvStateID kvStateId = new KvStateID();
+		byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+		ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+				alloc,
+				requestId,
+				kvStateId,
+				serializedKeyAndNamespace);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+		KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(requestId, request.getRequestId());
+		assertEquals(kvStateId, request.getKvStateId());
+		assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+	}
+
+	/**
+	 * Tests KvState request serialization with zero-length serialized key and namespace.
+	 */
+	@Test
+	public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
+		byte[] serializedKeyAndNamespace = new byte[0];
+
+		ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+				alloc,
+				1823,
+				new KvStateID(),
+				serializedKeyAndNamespace);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+		KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+	}
+
+	/**
+	 * Tests that we don't try to be smart about <code>null</code> key and namespace.
+	 * They should be treated explicitly.
+	 */
+	@Test(expected = NullPointerException.class)
+	public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
+		new KvStateRequest(0, new KvStateID(), null);
+	}
+
+	/**
+	 * Tests KvState request result serialization.
+	 */
+	@Test
+	public void testKvStateRequestResultSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 72727278L;
+		byte[] serializedResult = randomByteArray(1024);
+
+		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+				alloc,
+				requestId,
+				serializedResult);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(requestId, request.getRequestId());
+
+		assertArrayEquals(serializedResult, request.getSerializedResult());
+	}
+
+	/**
+	 * Tests KvState request result serialization with zero-length serialized result.
+	 */
+	@Test
+	public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception {
+		byte[] serializedResult = new byte[0];
+
+		ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+				alloc,
+				72727278,
+				serializedResult);
+
+		int frameLength = buf.readInt();
+
+		assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertArrayEquals(serializedResult, request.getSerializedResult());
+	}
+
+	/**
+	 * Tests that we don't try to be smart about <code>null</code> results.
+	 * They should be treated explicitly.
+	 */
+	@Test(expected = NullPointerException.class)
+	public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+		new KvStateRequestResult(0, null);
+	}
+
+	/**
+	 * Tests KvState request failure serialization.
+	 */
+	@Test
+	public void testKvStateRequestFailureSerialization() throws Exception {
+		long requestId = Integer.MAX_VALUE + 1111222L;
+		IllegalStateException cause = new IllegalStateException("Expected test");
+
+		ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
+				alloc,
+				requestId,
+				cause);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+		KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(requestId, request.getRequestId());
+		assertEquals(cause.getClass(), request.getCause().getClass());
+		assertEquals(cause.getMessage(), request.getCause().getMessage());
+	}
+
+	/**
+	 * Tests KvState server failure serialization.
+	 */
+	@Test
+	public void testServerFailureSerialization() throws Exception {
+		IllegalStateException cause = new IllegalStateException("Expected test");
+
+		ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
+
+		int frameLength = buf.readInt();
+		assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+		Throwable request = MessageSerializer.deserializeServerFailure(buf);
+		assertEquals(buf.readerIndex(), frameLength + 4);
+
+		assertEquals(cause.getClass(), request.getClass());
+		assertEquals(cause.getMessage(), request.getMessage());
+	}
+
+	private byte[] randomByteArray(int capacity) {
+		byte[] bytes = new byte[capacity];
+		ThreadLocalRandom.current().nextBytes(bytes);
+		return bytes;
+	}
+}