You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/26 17:01:54 UTC
[05/13] flink git commit: [FLINK-7908][QS] Restructure the queryable
state module.
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
deleted file mode 100644
index 3283295..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableListStateTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.queryablestate.client.state.ImmutableListState;
-import org.apache.flink.runtime.state.heap.HeapListState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableListState}.
- */
-public class ImmutableListStateTest {
-
- private final ListStateDescriptor<Long> listStateDesc =
- new ListStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
-
- private ImmutableListState<Long> listState;
-
- @Before
- public void setUp() throws Exception {
- if (!listStateDesc.isSerializerInitialized()) {
- listStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
- }
-
- List<Long> init = new ArrayList<>();
- init.add(42L);
-
- byte[] serInit = serializeInitValue(init);
- listState = ImmutableListState.createState(listStateDesc, serInit);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
- List<Long> list = getStateContents();
- assertEquals(1L, list.size());
-
- long element = list.get(0);
- assertEquals(42L, element);
-
- listState.add(54L);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testClear() {
- List<Long> list = getStateContents();
- assertEquals(1L, list.size());
-
- long element = list.get(0);
- assertEquals(42L, element);
-
- listState.clear();
- }
-
- /**
- * Copied from {@link HeapListState#getSerializedValue(Object, Object)}.
- */
- private byte[] serializeInitValue(List<Long> toSerialize) throws IOException {
- TypeSerializer<Long> serializer = listStateDesc.getElementSerializer();
-
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);
-
- // write the same as RocksDB writes lists, with one ',' separator
- for (int i = 0; i < toSerialize.size(); i++) {
- serializer.serialize(toSerialize.get(i), view);
- if (i < toSerialize.size() - 1) {
- view.writeByte(',');
- }
- }
- view.flush();
-
- return baos.toByteArray();
- }
-
- private List<Long> getStateContents() {
- List<Long> list = new ArrayList<>();
- for (Long elem: listState.get()) {
- list.add(elem);
- }
- return list;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
deleted file mode 100644
index 30a8a50..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableMapStateTest.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableMapState;
-import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests the {@link ImmutableMapState}.
- */
-public class ImmutableMapStateTest {
-
- private final MapStateDescriptor<Long, Long> mapStateDesc =
- new MapStateDescriptor<>(
- "test",
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.LONG_TYPE_INFO);
-
- private ImmutableMapState<Long, Long> mapState;
-
- @Before
- public void setUp() throws Exception {
- if (!mapStateDesc.isSerializerInitialized()) {
- mapStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
- }
-
- Map<Long, Long> initMap = new HashMap<>();
- initMap.put(1L, 5L);
- initMap.put(2L, 5L);
-
- byte[] initSer = KvStateSerializer.serializeMap(
- initMap.entrySet(),
- BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()),
- BasicTypeInfo.LONG_TYPE_INFO.createSerializer(new ExecutionConfig()));
-
- mapState = ImmutableMapState.createState(mapStateDesc, initSer);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testPut() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- mapState.put(2L, 54L);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testPutAll() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- Map<Long, Long> nMap = new HashMap<>();
- nMap.put(1L, 7L);
- nMap.put(2L, 7L);
-
- mapState.putAll(nMap);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- mapState.put(2L, 54L);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testIterator() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- Iterator<Map.Entry<Long, Long>> iterator = mapState.iterator();
- while (iterator.hasNext()) {
- iterator.remove();
- }
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testIterable() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- Iterable<Map.Entry<Long, Long>> iterable = mapState.entries();
- Iterator<Map.Entry<Long, Long>> iterator = iterable.iterator();
- while (iterator.hasNext()) {
- assertEquals(5L, (long) iterator.next().getValue());
- iterator.remove();
- }
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testKeys() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- Iterator<Long> iterator = mapState.keys().iterator();
- while (iterator.hasNext()) {
- iterator.remove();
- }
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testValues() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- Iterator<Long> iterator = mapState.values().iterator();
- while (iterator.hasNext()) {
- iterator.remove();
- }
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testClear() {
- assertTrue(mapState.contains(1L));
- long value = mapState.get(1L);
- assertEquals(5L, value);
-
- assertTrue(mapState.contains(2L));
- value = mapState.get(2L);
- assertEquals(5L, value);
-
- mapState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
deleted file mode 100644
index 9b1ecf8..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableReducingStateTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableReducingState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableReducingState}.
- */
-public class ImmutableReducingStateTest {
-
- private final ReducingStateDescriptor<Long> reducingStateDesc =
- new ReducingStateDescriptor<>("test", new SumReduce(), BasicTypeInfo.LONG_TYPE_INFO);
-
- private ImmutableReducingState<Long> reduceState;
-
- @Before
- public void setUp() throws Exception {
- if (!reducingStateDesc.isSerializerInitialized()) {
- reducingStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
- }
-
- reduceState = ImmutableReducingState.createState(
- reducingStateDesc,
- ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
- );
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
- long value = reduceState.get();
- assertEquals(42L, value);
-
- reduceState.add(54L);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testClear() {
- long value = reduceState.get();
- assertEquals(42L, value);
-
- reduceState.clear();
- }
-
- /**
- * Test {@link ReduceFunction} summing up its two arguments.
- */
- private static class SumReduce implements ReduceFunction<Long> {
-
- private static final long serialVersionUID = 6041237513913189144L;
-
- @Override
- public Long reduce(Long value1, Long value2) throws Exception {
- return value1 + value2;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
deleted file mode 100644
index 5f7032d..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/state/ImmutableValueStateTest.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.state;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.queryablestate.client.state.ImmutableValueState;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * Tests the {@link ImmutableValueState}.
- */
-public class ImmutableValueStateTest {
-
- private final ValueStateDescriptor<Long> valueStateDesc =
- new ValueStateDescriptor<>("test", BasicTypeInfo.LONG_TYPE_INFO);
-
- private ImmutableValueState<Long> valueState;
-
- @Before
- public void setUp() throws Exception {
- if (!valueStateDesc.isSerializerInitialized()) {
- valueStateDesc.initializeSerializerUnlessSet(new ExecutionConfig());
- }
-
- valueState = ImmutableValueState.createState(
- valueStateDesc,
- ByteBuffer.allocate(Long.BYTES).putLong(42L).array()
- );
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testUpdate() {
- long value = valueState.value();
- assertEquals(42L, value);
-
- valueState.update(54L);
- }
-
- @Test(expected = UnsupportedOperationException.class)
- public void testClear() {
- long value = valueState.value();
- assertEquals(42L, value);
-
- valueState.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties b/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
deleted file mode 100644
index 10792cd..0000000
--- a/flink-queryable-state/flink-queryable-state-java/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,31 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-# Set root logger level to OFF to not flood build logs
-# set manually to INFO for debugging purposes
-log4j.rootLogger=OFF, testlogger
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
-log4j.appender.testlogger.target = System.err
-log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
-log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
-
-# suppress the irrelevant (wrong) warnings from the netty channel handler
-log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR
-log4j.logger.org.apache.zookeeper=OFF
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/pom.xml b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
new file mode 100644
index 0000000..f39498e
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/pom.xml
@@ -0,0 +1,119 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-queryable-state</artifactId>
+ <version>1.4-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>flink-queryable-state-runtime_${scala.binary.version}</artifactId>
+ <name>flink-queryable-state-runtime</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+
+ <!-- core dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-queryable-state-client-java_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- ===================================================
+ Testing
+ =================================================== -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils-junit</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.curator</groupId>
+ <artifactId>curator-test</artifactId>
+ <version>${curator.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-runtime_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
new file mode 100644
index 0000000..d434336
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateKeyGroupLocationException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.Client;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.DisabledKvStateRequestStats;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
+import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This handler acts as an internal (to the Flink cluster) client that receives
+ * the requests from external clients, executes them by contacting the Job Manager (if necessary) and
+ * the Task Manager holding the requested state, and forwards the answer back to the client.
+ */
+@Internal
+@ChannelHandler.Sharable
+public class KvStateClientProxyHandler extends AbstractServerHandler<KvStateRequest, KvStateResponse> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateClientProxyHandler.class);
+
+ /** The proxy using this handler. */
+ private final KvStateClientProxy proxy;
+
+ /** A cache to hold the location of different states for which we have already seen requests. */
+ private final ConcurrentMap<Tuple2<JobID, String>, CompletableFuture<KvStateLocation>> lookupCache =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Network client to forward queries to {@link KvStateServerImpl state server}
+ * instances inside the cluster.
+ */
+ private final Client<KvStateInternalRequest, KvStateResponse> kvStateClient;
+
+ /**
+ * Create the handler used by the {@link KvStateClientProxyImpl}.
+ *
+ * @param proxy the {@link KvStateClientProxyImpl proxy} using the handler.
+ * @param queryExecutorThreads the number of threads used to process incoming requests.
+ * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+ * @param stats server statistics collector.
+ */
+ public KvStateClientProxyHandler(
+ final KvStateClientProxyImpl proxy,
+ final int queryExecutorThreads,
+ final MessageSerializer<KvStateRequest, KvStateResponse> serializer,
+ final KvStateRequestStats stats) {
+
+ super(proxy, serializer, stats);
+ this.proxy = Preconditions.checkNotNull(proxy);
+ this.kvStateClient = createInternalClient(queryExecutorThreads);
+ }
+
+ private static Client<KvStateInternalRequest, KvStateResponse> createInternalClient(int threads) {
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> messageSerializer =
+ new MessageSerializer<>(
+ new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+ new KvStateResponse.KvStateResponseDeserializer());
+
+ return new Client<>(
+ "Queryable State Proxy Client",
+ threads,
+ messageSerializer,
+ new DisabledKvStateRequestStats());
+ }
+
+ @Override
+ public CompletableFuture<KvStateResponse> handleRequest(
+ final long requestId,
+ final KvStateRequest request) {
+ CompletableFuture<KvStateResponse> response = new CompletableFuture<>();
+ executeActionAsync(response, request, false);
+ return response;
+ }
+
+ private void executeActionAsync(
+ final CompletableFuture<KvStateResponse> result,
+ final KvStateRequest request,
+ final boolean update) {
+
+ if (!result.isDone()) {
+ final CompletableFuture<KvStateResponse> operationFuture = getState(request, update);
+ operationFuture.whenCompleteAsync(
+ (t, throwable) -> {
+ if (throwable != null) {
+ if (throwable instanceof CancellationException) {
+ result.completeExceptionally(throwable);
+ } else if (throwable.getCause() instanceof UnknownKvStateIdException ||
+ throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
+ throwable.getCause() instanceof UnknownKvStateLocation ||
+ throwable.getCause() instanceof ConnectException) {
+
+ // These failures are likely to be caused by out-of-sync
+ // KvStateLocation. Therefore we retry this query and
+ // force look up the location.
+
+ executeActionAsync(result, request, true);
+ } else {
+ result.completeExceptionally(throwable);
+ }
+ } else {
+ result.complete(t);
+ }
+ }, queryExecutor);
+
+ result.whenComplete(
+ (t, throwable) -> operationFuture.cancel(false));
+ }
+ }
+
+ private CompletableFuture<KvStateResponse> getState(
+ final KvStateRequest request,
+ final boolean forceUpdate) {
+
+ return getKvStateLookupInfo(request.getJobId(), request.getStateName(), forceUpdate)
+ .thenComposeAsync((Function<KvStateLocation, CompletableFuture<KvStateResponse>>) location -> {
+ final int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(
+ request.getKeyHashCode(), location.getNumKeyGroups());
+
+ final InetSocketAddress serverAddress = location.getKvStateServerAddress(keyGroupIndex);
+ if (serverAddress == null) {
+ return FutureUtils.getFailedFuture(new UnknownKvStateKeyGroupLocationException(getServerName()));
+ } else {
+ // Query server
+ final KvStateID kvStateId = location.getKvStateID(keyGroupIndex);
+ final KvStateInternalRequest internalRequest = new KvStateInternalRequest(
+ kvStateId, request.getSerializedKeyAndNamespace());
+ return kvStateClient.sendRequest(serverAddress, internalRequest);
+ }
+ }, queryExecutor);
+ }
+
+ /**
+ * Lookup the {@link KvStateLocation} for the given job and queryable state name.
+ *
+ * <p>The job manager will be queried for the location only if forced or no
+ * cached location can be found. There are no guarantees about
+ *
+ * @param jobId JobID the state instance belongs to.
+ * @param queryableStateName Name under which the state instance has been published.
+ * @param forceUpdate Flag to indicate whether to force a update via the lookup service.
+ * @return Future holding the KvStateLocation
+ */
+ private CompletableFuture<KvStateLocation> getKvStateLookupInfo(
+ final JobID jobId,
+ final String queryableStateName,
+ final boolean forceUpdate) {
+
+ final Tuple2<JobID, String> cacheKey = new Tuple2<>(jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> cachedFuture = lookupCache.get(cacheKey);
+
+ if (!forceUpdate && cachedFuture != null && !cachedFuture.isCompletedExceptionally()) {
+ LOG.debug("Retrieving location for state={} of job={} from the cache.", jobId, queryableStateName);
+ return cachedFuture;
+ }
+
+ LOG.debug("Retrieving location for state={} of job={} from the job manager.", jobId, queryableStateName);
+
+ return proxy.getJobManagerFuture().thenComposeAsync(
+ jobManagerGateway -> {
+ final Object msg = new KvStateMessage.LookupKvStateLocation(jobId, queryableStateName);
+ final CompletableFuture<KvStateLocation> locationFuture = FutureUtils.toJava(
+ jobManagerGateway.ask(msg, FiniteDuration.apply(1000L, TimeUnit.MILLISECONDS))
+ .mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class)));
+
+ lookupCache.put(cacheKey, locationFuture);
+ return locationFuture;
+ }, queryExecutor);
+ }
+
+ @Override
+ public void shutdown() {
+ kvStateClient.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
new file mode 100644
index 0000000..f473443
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyImpl.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.client.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.exceptions.UnknownJobManagerException;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.query.KvStateClientProxy;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * The default implementation of the {@link KvStateClientProxy}.
+ */
+@Internal
+public class KvStateClientProxyImpl extends AbstractServerBase<KvStateRequest, KvStateResponse> implements KvStateClientProxy {
+
+ private static final CompletableFuture<ActorGateway> UNKNOWN_JOB_MANAGER =
+ FutureUtils.getFailedFuture(new UnknownJobManagerException());
+
+ /** Number of threads used to process incoming requests. */
+ private final int queryExecutorThreads;
+
+ /** Statistics collector. */
+ private final KvStateRequestStats stats;
+
+ private final Object leaderLock = new Object();
+
+ private CompletableFuture<ActorGateway> jobManagerFuture = UNKNOWN_JOB_MANAGER;
+
+ /**
+ * Creates the Queryable State Client Proxy.
+ *
+ * <p>The server is instantiated using reflection by the
+ * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)
+ * QueryableStateUtils.createKvStateClientProxy(InetAddress, Iterator, int, int, KvStateRequestStats)}.
+ *
+ * <p>The server needs to be started via {@link #start()} in order to bind
+ * to the configured bind address.
+ *
+ * @param bindAddress the address to listen to.
+ * @param bindPortIterator the port range to try to bind to.
+ * @param numEventLoopThreads number of event loop threads.
+ * @param numQueryThreads number of query threads.
+ * @param stats the statistics collector.
+ */
+ public KvStateClientProxyImpl(
+ final InetAddress bindAddress,
+ final Iterator<Integer> bindPortIterator,
+ final Integer numEventLoopThreads,
+ final Integer numQueryThreads,
+ final KvStateRequestStats stats) {
+
+ super("Queryable State Proxy Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
+ Preconditions.checkArgument(numQueryThreads >= 1, "Non-positive number of query threads.");
+ this.queryExecutorThreads = numQueryThreads;
+ this.stats = Preconditions.checkNotNull(stats);
+ }
+
+ @Override
+ public InetSocketAddress getServerAddress() {
+ return super.getServerAddress();
+ }
+
+ @Override
+ public void start() throws Throwable {
+ super.start();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+
+ @Override
+ public void updateJobManager(CompletableFuture<ActorGateway> leadingJobManager) throws Exception {
+ synchronized (leaderLock) {
+ if (leadingJobManager == null) {
+ jobManagerFuture = UNKNOWN_JOB_MANAGER;
+ } else {
+ jobManagerFuture = leadingJobManager;
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<ActorGateway> getJobManagerFuture() {
+ synchronized (leaderLock) {
+ return jobManagerFuture;
+ }
+ }
+
+ @Override
+ public AbstractServerHandler<KvStateRequest, KvStateResponse> initializeHandler() {
+ MessageSerializer<KvStateRequest, KvStateResponse> serializer =
+ new MessageSerializer<>(
+ new KvStateRequest.KvStateRequestDeserializer(),
+ new KvStateResponse.KvStateResponseDeserializer());
+ return new KvStateClientProxyHandler(this, queryExecutorThreads, serializer, stats);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
new file mode 100644
index 0000000..8c8de59
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/messages/KvStateInternalRequest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.messages;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.KvStateID;
+import org.apache.flink.queryablestate.network.messages.MessageBody;
+import org.apache.flink.queryablestate.network.messages.MessageDeserializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+
+import java.nio.ByteBuffer;
+
+/**
+ * The request to be forwarded by the {@link org.apache.flink.runtime.query.KvStateClientProxy
+ * Queryable State Client Proxy} to the {@link org.apache.flink.runtime.query.KvStateServer State Server}
+ * of the Task Manager responsible for the requested state.
+ */
+@Internal
+public class KvStateInternalRequest extends MessageBody {
+
+ private final KvStateID kvStateId;
+ private final byte[] serializedKeyAndNamespace;
+
+ public KvStateInternalRequest(
+ final KvStateID stateId,
+ final byte[] serializedKeyAndNamespace) {
+
+ this.kvStateId = Preconditions.checkNotNull(stateId);
+ this.serializedKeyAndNamespace = Preconditions.checkNotNull(serializedKeyAndNamespace);
+ }
+
+ public KvStateID getKvStateId() {
+ return kvStateId;
+ }
+
+ public byte[] getSerializedKeyAndNamespace() {
+ return serializedKeyAndNamespace;
+ }
+
+ @Override
+ public byte[] serialize() {
+
+ // KvStateId + sizeOf(serializedKeyAndNamespace) + serializedKeyAndNamespace
+ final int size = KvStateID.SIZE + Integer.BYTES + serializedKeyAndNamespace.length;
+
+ return ByteBuffer.allocate(size)
+ .putLong(kvStateId.getLowerPart())
+ .putLong(kvStateId.getUpperPart())
+ .putInt(serializedKeyAndNamespace.length)
+ .put(serializedKeyAndNamespace)
+ .array();
+ }
+
+ /**
+ * A {@link MessageDeserializer deserializer} for {@link KvStateInternalRequest}.
+ */
+ public static class KvStateInternalRequestDeserializer implements MessageDeserializer<KvStateInternalRequest> {
+
+ @Override
+ public KvStateInternalRequest deserializeMessage(ByteBuf buf) {
+ KvStateID kvStateId = new KvStateID(buf.readLong(), buf.readLong());
+
+ int length = buf.readInt();
+ Preconditions.checkArgument(length >= 0,
+ "Negative length for key and namespace. " +
+ "This indicates a serialization error.");
+
+ byte[] serializedKeyAndNamespace = new byte[length];
+ if (length > 0) {
+ buf.readBytes(serializedKeyAndNamespace);
+ }
+ return new KvStateInternalRequest(kvStateId, serializedKeyAndNamespace);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
new file mode 100644
index 0000000..476f153
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerHandler.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
+import org.apache.flink.queryablestate.exceptions.UnknownKvStateIdException;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * This handler dispatches asynchronous tasks, which query {@link InternalKvState}
+ * instances and write the result to the channel.
+ *
+ * <p>The network threads receive the message, deserialize it and dispatch the
+ * query task. The actual query is handled in a separate thread as it might
+ * otherwise block the network threads (file I/O etc.).
+ */
+@Internal
+@ChannelHandler.Sharable
+public class KvStateServerHandler extends AbstractServerHandler<KvStateInternalRequest, KvStateResponse> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateServerHandler.class);
+
+ /** KvState registry holding references to the KvState instances. */
+ private final KvStateRegistry registry;
+
+ /**
+ * Create the handler used by the {@link KvStateServerImpl}.
+ *
+ * @param server the {@link KvStateServerImpl} using the handler.
+ * @param kvStateRegistry registry to query.
+ * @param serializer the {@link MessageSerializer} used to (de-) serialize the different messages.
+ * @param stats server statistics collector.
+ */
+ public KvStateServerHandler(
+ final KvStateServerImpl server,
+ final KvStateRegistry kvStateRegistry,
+ final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer,
+ final KvStateRequestStats stats) {
+
+ super(server, serializer, stats);
+ this.registry = Preconditions.checkNotNull(kvStateRegistry);
+ }
+
+ @Override
+ public CompletableFuture<KvStateResponse> handleRequest(final long requestId, final KvStateInternalRequest request) {
+ final CompletableFuture<KvStateResponse> responseFuture = new CompletableFuture<>();
+
+ try {
+ final InternalKvState<?> kvState = registry.getKvState(request.getKvStateId());
+ if (kvState == null) {
+ responseFuture.completeExceptionally(new UnknownKvStateIdException(getServerName(), request.getKvStateId()));
+ } else {
+ byte[] serializedKeyAndNamespace = request.getSerializedKeyAndNamespace();
+
+ byte[] serializedResult = kvState.getSerializedValue(serializedKeyAndNamespace);
+ if (serializedResult != null) {
+ responseFuture.complete(new KvStateResponse(serializedResult));
+ } else {
+ responseFuture.completeExceptionally(new UnknownKeyOrNamespaceException(getServerName()));
+ }
+ }
+ return responseFuture;
+ } catch (Throwable t) {
+ String errMsg = "Error while processing request with ID " + requestId +
+ ". Caused by: " + ExceptionUtils.stringifyException(t);
+ responseFuture.completeExceptionally(new RuntimeException(errMsg));
+ return responseFuture;
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ // do nothing
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0c771505/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
new file mode 100644
index 0000000..fe07687
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.server;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.queryablestate.messages.KvStateInternalRequest;
+import org.apache.flink.queryablestate.messages.KvStateResponse;
+import org.apache.flink.queryablestate.network.AbstractServerBase;
+import org.apache.flink.queryablestate.network.AbstractServerHandler;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.stats.KvStateRequestStats;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+
+/**
+ * The default implementation of the {@link KvStateServer}.
+ */
+@Internal
+public class KvStateServerImpl extends AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements KvStateServer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateServerImpl.class);
+
+ /** The {@link KvStateRegistry} to query for state instances. */
+ private final KvStateRegistry kvStateRegistry;
+
+ private final KvStateRequestStats stats;
+
+ private MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer;
+
+ /**
+ * Creates the state server.
+ *
+ * <p>The server is instantiated using reflection by the
+ * {@link org.apache.flink.runtime.query.QueryableStateUtils#createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)
+ * QueryableStateUtils.createKvStateServer(InetAddress, Iterator, int, int, KvStateRegistry, KvStateRequestStats)}.
+ *
+ * <p>The server needs to be started via {@link #start()} in order to bind
+ * to the configured bind address.
+ *
+ * @param bindAddress the address to listen to.
+ * @param bindPortIterator the port range to try to bind to.
+ * @param numEventLoopThreads number of event loop threads.
+ * @param numQueryThreads number of query threads.
+ * @param kvStateRegistry {@link KvStateRegistry} to query for state instances.
+ * @param stats the statistics collector.
+ */
+ public KvStateServerImpl(
+ final InetAddress bindAddress,
+ final Iterator<Integer> bindPortIterator,
+ final Integer numEventLoopThreads,
+ final Integer numQueryThreads,
+ final KvStateRegistry kvStateRegistry,
+ final KvStateRequestStats stats) {
+
+ super("Queryable State Server", bindAddress, bindPortIterator, numEventLoopThreads, numQueryThreads);
+ this.stats = Preconditions.checkNotNull(stats);
+ this.kvStateRegistry = Preconditions.checkNotNull(kvStateRegistry);
+ }
+
+ @Override
+ public AbstractServerHandler<KvStateInternalRequest, KvStateResponse> initializeHandler() {
+ this.serializer = new MessageSerializer<>(
+ new KvStateInternalRequest.KvStateInternalRequestDeserializer(),
+ new KvStateResponse.KvStateResponseDeserializer());
+ return new KvStateServerHandler(this, kvStateRegistry, serializer, stats);
+ }
+
+ public MessageSerializer<KvStateInternalRequest, KvStateResponse> getSerializer() {
+ Preconditions.checkState(serializer != null, "Server " + getServerName() + " has not been started.");
+ return serializer;
+ }
+
+ @Override
+ public void start() throws Throwable {
+ super.start();
+ }
+
+ @Override
+ public InetSocketAddress getServerAddress() {
+ return super.getServerAddress();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ }
+}