You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:54 UTC

[05/13] flink git commit: [FLINK-7908][QS] Restructure the queryable state module.

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
deleted file mode 100644
index 3283295..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.queryablestate.client.state.ImmutableListState;
-import org.apache.flink.runtime.state.heap.HeapListState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableListState}.
- */
-public class ImmutableListStateTest {
-
-	private final ListStateDescriptor<Long> listStateDesc =
-			new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
-
-	private ImmutableListState<Long> listState;
-
-	@Before
-	public void setUp() throws Exception {
-		if (!listStateDesc.isSerializerInitialized()) {
-			listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
-
-		List<Long> init = new ArrayList<>();
-		init.add(42L);
-
-		byte[] serInit = serializeInitValue(init);
-		listState = ImmutableListState.createState(listStateDesc, serInit);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
-		List<Long> list = getStateContents();
-		assertEquals(1L, list.size());
-
-		long element = list.get(0);
-		assertEquals(42L, element);
-
-		listState.add(54L);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
-		List<Long> list = getStateContents();
-		assertEquals(1L, list.size());
-
-		long element = list.get(0);
-		assertEquals(42L, element);
-
-		listState.clear();
-	}
-
-	/**
-	 * Copied from {@link HeapListState#getSerializedValue(Object, Object)}.
-	 */
-	private byte[] serializeInitValue(List<Long> toSerialize) throws IOException {
-		TypeSerializer<Long> serializer = listStateDesc.getElementSerializer();
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
-
-		// write the same as RocksDB writes lists, with one ',' separator
-		for (int i = 0; i < toSerialize.size(); i++) {
-			serializer.serialize(toSerialize.get(i), view);
-			if (i < toSerialize.size() - 1) {
-				view.writeByte(',');
-			}
-		}
-		view.flush();
-
-		return baos.toByteArray();
-	}
-
-	private List<Long> getStateContents() {
-		List<Long> list = new ArrayList<>();
-		for (Long elem: listState.get()) {
-			list.add(elem);
-		}
-		return list;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
deleted file mode 100644
index 30a8a50..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableMapState;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests the {@link ImmutableMapState}.
- */
-public class ImmutableMapStateTest {
-
-	private final MapStateDescriptor<Long, Long> mapStateDesc =
-			new MapStateDescriptor<>(
-					"test",
-					BasicTypeInfo.LONG_TYPE_INFO,
-					BasicTypeInfo.LONG_TYPE_INFO);
-
-	private ImmutableMapState<Long, Long> mapState;
-
-	@Before
-	public void setUp() throws Exception {
-		if (!mapStateDesc.isSerializerInitialized()) {
-			mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
-
-		Map<Long, Long> initMap = new HashMap<>();
-		initMap.put(1L, 5L);
-		initMap.put(2L, 5L);
-
-		byte[] initSer = KvStateSerializer.serializeMap(
-				initMap.entrySet(),
-				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
-				BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
-
-		mapState = ImmutableMapState.createState(mapStateDesc, initSer);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testPut() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		mapState.put(2L, 54L);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testPutAll() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		Map<Long, Long> nMap = new HashMap<>();
-		nMap.put(1L, 7L);
-		nMap.put(2L, 7L);
-
-		mapState.putAll(nMap);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		mapState.put(2L, 54L);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testIterator() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
-		while (iterator.hasNext()) {
-			iterator.remove();
-		}
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testIterable() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
-		Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
-		while (iterator.hasNext()) {
-			assertEquals(5L, (long) iterator.next().getValue());
-			iterator.remove();
-		}
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testKeys() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		Iterator<Long> iterator = mapState.keys().iterator();
-		while (iterator.hasNext()) {
-			iterator.remove();
-		}
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testValues() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		Iterator<Long> iterator = mapState.values().iterator();
-		while (iterator.hasNext()) {
-			iterator.remove();
-		}
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
-		assertTrue(mapState.contains(1L));
-		long value = mapState.get(1L);
-		assertEquals(5L, value);
-
-		assertTrue(mapState.contains(2L));
-		value = mapState.get(2L);
-		assertEquals(5L, value);
-
-		mapState.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
deleted file mode 100644
index 9b1ecf8..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableReducingState}.
- */
-public class ImmutableReducingStateTest {
-
-	private final ReducingStateDescriptor<Long> reducingStateDesc =
-			new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
-
-	private ImmutableReducingState<Long> reduceState;
-
-	@Before
-	public void setUp() throws Exception {
-		if (!reducingStateDesc.isSerializerInitialized()) {
-			reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
-
-		reduceState = ImmutableReducingState.createState(
-				reducingStateDesc,
-				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
-		);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
-		long value = reduceState.get();
-		assertEquals(42L, value);
-
-		reduceState.add(54L);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
-		long value = reduceState.get();
-		assertEquals(42L, value);
-
-		reduceState.clear();
-	}
-
-	/**
-	 * Test {@link ReduceFunction} summing up its two arguments.
-	 */
-	private static class SumReduce implements ReduceFunction<Long> {
-
-		private static final long serialVersionUID = 6041237513913189144L;
-
-		@Override
-		public Long reduce(Long value1, Long value2) throws Exception {
-			return value1 + value2;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
deleted file mode 100644
index 5f7032d..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableValueState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableValueState}.
- */
-public class ImmutableValueStateTest {
-
-	private final ValueStateDescriptor<Long> valueStateDesc =
-			new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
-
-	private ImmutableValueState<Long> valueState;
-
-	@Before
-	public void setUp() throws Exception {
-		if (!valueStateDesc.isSerializerInitialized()) {
-			valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
-		}
-
-		valueState = ImmutableValueState.createState(
-				valueStateDesc,
-				ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
-		);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testUpdate() {
-		long value = valueState.value();
-		assertEquals(42L, value);
-
-		valueState.update(54L);
-	}
-
-	@Test(expected = UnsupportedOperationException.class)
-	public void testClear() {
-		long value = valueState.value();
-		assertEquals(42L, value);
-
-		valueState.clear();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
deleted file mode 100644
index 10792cd..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
-log4j.logger.org.apache.zookeeper=OFF

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
new file mode 100644
index 0000000..f39498e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-queryable-state</artifactId>
+		<version>1.4-SNAPSHOT</version>
+		<relativePath>..</relativePath>
+	</parent>
+
+	<artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
+	<name>flink-queryable-state-runtime</name>
+	<packaging>jar</packaging>
+
+	<dependencies>
+
+		<!-- core dependencies -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-core</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<!-- ===================================================
+								Testing
+			=================================================== -->
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils-junit</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.curator</groupId>
+			<artifactId>curator-test</artifactId>
+			<version>${curator.version}</version>
+			<scope>test</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+			<scope>test</scope>
+		</dependency>
+
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
new file mode 100644
index 0000000..d434336
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This handler acts as an internal (to the Flink cluster) client that receives
+ * the requests from external clients, executes them by contacting the Job Manager (if necessary) and
+ * the Task Manager holding the requested state, and forwards the answer back to the client.
+ */
+@Internal
+@ChannelHandler.Sharable
+public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
+
+	/** The proxy using this handler. */
+	private final KvStateClientProxy proxy;
+
+	/** A cache to hold the location of different states for which we have already seen requests. */
+	private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache =
+			new ConcurrentHashMap<>();
+
+	/**
+	 * Network client to forward queries to {@link KvStateServerImpl state server}
+	 * instances inside the cluster.
+	 */
+	private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;
+
+	/**
+	 * Create the handler used by the {@link KvStateClientProxyImpl}.
+	 *
+	 * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler.
+	 * @param queryExecutorThreads the number of threads used to process incoming requests.
+	 * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+	 * @param stats server statistics collector.
+	 */
+	public KvStateClientProxyHandler(
+			final KvStateClientProxyImpl proxy,
+			final int queryExecutorThreads,
+			final MessageSerializer<KvStateRequest, KvStateResponse> serializer,
+			final KvStateRequestStats stats) {
+
+		super(proxy, serializer, stats);
+		this.proxy = Preconditions.checkNotNull(proxy);
+		this.kvStateClient = createInternalClient(queryExecutorThreads);
+	}
+
+	private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) {
+		final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer =
+				new MessageSerializer<>(
+						new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
+
+		return new Client<>(
+				"Queryable State Proxy Client",
+				threads,
+				messageSerializer,
+				new DisabledKvStateRequestStats());
+	}
+
+	@Override
+	public CompletableFuture<KvStateResponse> handleRequest(
+			final long requestId,
+			final KvStateRequest request) {
+		CompletableFuture<KvStateResponse> response = new CompletableFuture<>();
+		executeActionAsync(response, request, false);
+		return response;
+	}
+
+	private void executeActionAsync(
+			final CompletableFuture<KvStateResponse> result,
+			final KvStateRequest request,
+			final boolean update) {
+
+		if (!result.isDone()) {
+			final CompletableFuture<KvStateResponse> operationFuture = getState(request, update);
+			operationFuture.whenCompleteAsync(
+					(t, throwable) -> {
+						if (throwable != null) {
+							if (throwable instanceof CancellationException) {
+								result.completeExceptionally(throwable);
+							} else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+									throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
+									throwable.getCause() instanceof UnknownKvStateLocation ||
+									throwable.getCause() instanceof ConnectException) {
+
+								// These failures are likely to be caused by out-of-sync
+								// KvStateLocation. Therefore we retry this query and
+								// force look up the location.
+
+								executeActionAsync(result, request, true);
+							} else {
+								result.completeExceptionally(throwable);
+							}
+						} else {
+							result.complete(t);
+						}
+					}, queryExecutor);
+
+			result.whenComplete(
+					(t, throwable) -> operationFuture.cancel(false));
+		}
+	}
+
+	private CompletableFuture<KvStateResponse> getState(
+			final KvStateRequest request,
+			final boolean forceUpdate) {
+
+		return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate)
+				.thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> {
+					final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
+							request.getKeyHashCode(), location.getNumKeyGroups());
+
+					final InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
+					if (serverAddress == null) {
+						return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
+					} else {
+						// Query server
+						final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
+						final KvStateInternalRequest internalRequest = new KvStateInternalRequest(
+								kvStateId, request.getSerializedKeyAndNamespace());
+						return kvStateClient.sendRequest(serverAddress, internalRequest);
+					}
+				}, queryExecutor);
+	}
+
+	/**
+	 * Lookup the {@link KvStateLocation} for the given job and queryable state name.
+	 *
+	 * <p>The job manager will be queried for the location only if forced or no
+	 * cached location can be found. There are no guarantees about
+	 *
+	 * @param jobId              JobID the state instance belongs to.
+	 * @param queryableStateName Name under which the state instance has been published.
+	 * @param forceUpdate        Flag to indicate whether to force a update via the lookup service.
+	 * @return Future holding the KvStateLocation
+	 */
+	private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
+			final JobID jobId,
+			final String queryableStateName,
+			final boolean forceUpdate) {
+
+		final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
+		final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
+
+		if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) {
+			LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName);
+			return cachedFuture;
+		}
+
+		LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
+
+		return proxy.getJobManagerFuture().thenComposeAsync(
+				jobManagerGateway -> {
+					final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
+					final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
+							jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+									.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+
+					lookupCache.put(cacheKey, locationFuture);
+					return locationFuture;
+				}, queryExecutor);
+	}
+
+	@Override
+	public void shutdown() {
+		kvStateClient.shutdown();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
new file mode 100644
index 0000000..f473443
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link KvStateClientProxy}.
+ */
+@Internal
+public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
+
+	private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
+			FutureUtils.getFailedFuture(new UnknownJobManagerException());
+
+	/** Number of threads used to process incoming requests. */
+	private final int queryExecutorThreads;
+
+	/** Statistics collector. */
+	private final KvStateRequestStats stats;
+
+	private final Object leaderLock = new Object();
+
+	private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+
+	/**
+	 * Creates the Queryable State Client Proxy.
+	 *
+	 * <p>The server is instantiated using reflection by the
+	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)
+	 * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}.
+	 *
+	 * <p>The server needs to be started via {@link #start()} in order to bind
+	 * to the configured bind address.
+	 *
+	 * @param bindAddress the address to listen to.
+	 * @param bindPortIterator the port range to try to bind to.
+	 * @param numEventLoopThreads number of event loop threads.
+	 * @param numQueryThreads number of query threads.
+	 * @param stats the statistics collector.
+	 */
+	public KvStateClientProxyImpl(
+			final InetAddress bindAddress,
+			final Iterator<Integer> bindPortIterator,
+			final Integer numEventLoopThreads,
+			final Integer numQueryThreads,
+			final KvStateRequestStats stats) {
+
+		super("Queryable State Proxy Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
+		Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+		this.queryExecutorThreads = numQueryThreads;
+		this.stats = Preconditions.checkNotNull(stats);
+	}
+
+	@Override
+	public InetSocketAddress getServerAddress() {
+		return super.getServerAddress();
+	}
+
+	@Override
+	public void start() throws Throwable {
+		super.start();
+	}
+
+	@Override
+	public void shutdown() {
+		super.shutdown();
+	}
+
+	@Override
+	public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
+		synchronized (leaderLock) {
+			if (leadingJobManager == null) {
+				jobManagerFuture = UNKNOWN_JOB_MANAGER;
+			} else {
+				jobManagerFuture = leadingJobManager;
+			}
+		}
+	}
+
+	@Override
+	public CompletableFuture<ActorGateway> getJobManagerFuture() {
+		synchronized (leaderLock) {
+			return jobManagerFuture;
+		}
+	}
+
+	@Override
+	public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() {
+		MessageSerializer<KvStateRequest, KvStateResponse> serializer =
+				new MessageSerializer<>(
+						new KvStateRequest.KvStateRequestDeserializer(),
+						new KvStateResponse.KvStateResponseDeserializer());
+		return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
new file mode 100644
index 0000000..8c8de59
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy
+ * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server}
+ * of the Task Manager responsible for the requested state.
+ */
+@Internal
+public class KvStateInternalRequest extends MessageBody {
+
+	private final KvStateID kvStateId;
+	private final byte[] serializedKeyAndNamespace;
+
+	public KvStateInternalRequest(
+			final KvStateID stateId,
+			final byte[] serializedKeyAndNamespace) {
+
+		this.kvStateId = Preconditions.checkNotNull(stateId);
+		this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+	}
+
+	public KvStateID getKvStateId() {
+		return kvStateId;
+	}
+
+	public byte[] getSerializedKeyAndNamespace() {
+		return serializedKeyAndNamespace;
+	}
+
+	@Override
+	public byte[] serialize() {
+
+		// KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace
+		final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length;
+
+		return ByteBuffer.allocate(size)
+				.putLong(kvStateId.getLowerPart())
+				.putLong(kvStateId.getUpperPart())
+				.putInt(serializedKeyAndNamespace.length)
+				.put(serializedKeyAndNamespace)
+				.array();
+	}
+
+	/**
+	 * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}.
+	 */
+	public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> {
+
+		@Override
+		public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
+			KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
+
+			int length = buf.readInt();
+			Preconditions.checkArgument(length >= 0,
+					"Negative length for key and namespace. " +
+							"This indicates a serialization error.");
+
+			byte[] serializedKeyAndNamespace = new byte[length];
+			if (length > 0) {
+				buf.readBytes(serializedKeyAndNamespace);
+			}
+			return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
new file mode 100644
index 0000000..476f153
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
+ * instances and write the result to the channel.
+ *
+ * <p>The network threads receive the message, deserialize it and dispatch the
+ * query task. The actual query is handled in a separate thread as it might
+ * otherwise block the network threads (file I/O etc.).
+ */
+@Internal
+@ChannelHandler.Sharable
+public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
+
+	/** KvState registry holding references to the KvState instances. */
+	private final KvStateRegistry registry;
+
+	/**
+	 * Create the handler used by the {@link KvStateServerImpl}.
+	 *
+	 * @param server the {@link KvStateServerImpl} using the handler.
+	 * @param kvStateRegistry registry to query.
+	 * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+	 * @param stats server statistics collector.
+	 */
+	public KvStateServerHandler(
+			final KvStateServerImpl server,
+			final KvStateRegistry kvStateRegistry,
+			final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
+			final KvStateRequestStats stats) {
+
+		super(server, serializer, stats);
+		this.registry = Preconditions.checkNotNull(kvStateRegistry);
+	}
+
+	@Override
+	public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
+		final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
+
+		try {
+			final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+			if (kvState == null) {
+				responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
+			} else {
+				byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
+
+				byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+				if (serializedResult != null) {
+					responseFuture.complete(new KvStateResponse(serializedResult));
+				} else {
+					responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
+				}
+			}
+			return responseFuture;
+		} catch (Throwable t) {
+			String errMsg = "Error while processing request with ID " + requestId +
+					". Caused by: " + ExceptionUtils.stringifyException(t);
+			responseFuture.completeExceptionally(new RuntimeException(errMsg));
+			return responseFuture;
+		}
+	}
+
+	@Override
+	public void shutdown() {
+		// do nothing
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
new file mode 100644
index 0000000..fe07687
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+
+/**
+ * The default implementation of the {@link KvStateServer}.
+ */
+@Internal
+public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
+
+	/** The {@link KvStateRegistry} to query for state instances. */
+	private final KvStateRegistry kvStateRegistry;
+
+	private final KvStateRequestStats stats;
+
+	private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
+
+	/**
+	 * Creates the state server.
+	 *
+	 * <p>The server is instantiated using reflection by the
+	 * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
+	 * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}.
+	 *
+	 * <p>The server needs to be started via {@link #start()} in order to bind
+	 * to the configured bind address.
+	 *
+	 * @param bindAddress the address to listen to.
+	 * @param bindPortIterator the port range to try to bind to.
+	 * @param numEventLoopThreads number of event loop threads.
+	 * @param numQueryThreads number of query threads.
+	 * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
+	 * @param stats the statistics collector.
+	 */
+	public KvStateServerImpl(
+			final InetAddress bindAddress,
+			final Iterator<Integer> bindPortIterator,
+			final Integer numEventLoopThreads,
+			final Integer numQueryThreads,
+			final KvStateRegistry kvStateRegistry,
+			final KvStateRequestStats stats) {
+
+		super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
+		this.stats = Preconditions.checkNotNull(stats);
+		this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+	}
+
+	@Override
+	public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
+		this.serializer = new MessageSerializer<>(
+				new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+				new KvStateResponse.KvStateResponseDeserializer());
+		return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
+	}
+
+	public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
+		Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
+		return serializer;
+	}
+
+	@Override
+	public void start() throws Throwable {
+		super.start();
+	}
+
+	@Override
+	public InetSocketAddress getServerAddress() {
+		return super.getServerAddress();
+	}
+
+	@Override
+	public void shutdown() {
+		super.shutdown();
+	}
+}