You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/11 15:46:06 UTC
[06/14] flink git commit: [FLINK-7769][QS] Move queryable state
outside the runtime.
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..15a5ff6
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateITCase.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the NON-HA mode.
+ */
+public abstract class HAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
+
+ private static final int NUM_JMS = 2;
+ private static final int NUM_TMS = 4;
+ private static final int NUM_SLOTS_PER_TM = 4;
+
+ private static TestingServer zkServer;
+ private static TemporaryFolder temporaryFolder;
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ zkServer = new TestingServer();
+ temporaryFolder = new TemporaryFolder();
+ temporaryFolder.create();
+
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, NUM_JMS);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+ config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+ config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 2);
+ config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 2);
+ config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, temporaryFolder.newFolder().toString());
+ config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+ config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+
+ cluster = new TestingCluster(config, false);
+ cluster.start();
+
+ testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+ // verify that we are in HA mode
+ Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.ZOOKEEPER);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ if (cluster != null) {
+ cluster.stop();
+ cluster.awaitTermination();
+ }
+
+ testActorSystem.shutdown();
+ testActorSystem.awaitTermination();
+
+ try {
+ zkServer.stop();
+ zkServer.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ temporaryFolder.delete();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..a2d3ad0
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class HAQueryableStateITCaseFsBackend extends HAAbstractQueryableStateITCase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..fda1171
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class HAQueryableStateITCaseRocksDBBackend extends HAAbstractQueryableStateITCase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..907e8a3
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization using
+ * the KvStateSerializer with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ /**
+ * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+ * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+ * the tests.
+ *
+ * @param <K> key type
+ */
+ static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+ RocksDBKeyedStateBackend2(
+ final String operatorIdentifier,
+ final ClassLoader userCodeClassLoader,
+ final File instanceBasePath,
+ final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions,
+ final TaskKvStateRegistry kvStateRegistry,
+ final TypeSerializer<K> keySerializer,
+ final int numberOfKeyGroups,
+ final KeyGroupRange keyGroupRange,
+ final ExecutionConfig executionConfig) throws Exception {
+
+ super(operatorIdentifier, userCodeClassLoader,
+ instanceBasePath,
+ dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+ numberOfKeyGroups, keyGroupRange, executionConfig, false);
+ }
+
+ @Override
+ public <N, T> InternalListState<N, T> createListState(
+ final TypeSerializer<N> namespaceSerializer,
+ final ListStateDescriptor<T> stateDesc) throws Exception {
+
+ return super.createListState(namespaceSerializer, stateDesc);
+ }
+ }
+
+ /**
+ * Tests list serialization and deserialization match.
+ *
+ * @see KvStateRequestSerializerTest#testListSerialization()
+ * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+ * test
+ */
+ @Test
+ public void testListSerialization() throws Exception {
+ final long key = 0L;
+
+ // objects for RocksDB state list serialisation
+ DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+ final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend2<>(
+ "no-op",
+ ClassLoader.getSystemClassLoader(),
+ temporaryFolder.getRoot(),
+ dbOptions,
+ columnFamilyOptions,
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
+ );
+ longHeapKeyedStateBackend.restore(null);
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+ .createListState(VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
+ KvStateRequestSerializerTest.testListSerialization(key, listState);
+ }
+
+ /**
+ * Tests map serialization and deserialization match.
+ *
+ * @see KvStateRequestSerializerTest#testMapSerialization()
+ * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
+ * test
+ */
+ @Test
+ public void testMapSerialization() throws Exception {
+ final long key = 0L;
+
+ // objects for RocksDB state list serialisation
+ DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+ final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend<>(
+ "no-op",
+ ClassLoader.getSystemClassLoader(),
+ temporaryFolder.getRoot(),
+ dbOptions,
+ columnFamilyOptions,
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig(),
+ false);
+ longHeapKeyedStateBackend.restore(null);
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
+ longHeapKeyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+ KvStateRequestSerializerTest.testMapSerialization(key, mapState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
new file mode 100644
index 0000000..c52acc8
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAAbstractQueryableStateITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.QueryableStateOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Base class with the cluster configuration for the tests on the HA mode.
+ */
+public abstract class NonHAAbstractQueryableStateITCase extends AbstractQueryableStateITCase {
+
+ private static final int NUM_TMS = 2;
+ private static final int NUM_SLOTS_PER_TM = 4;
+
+ @BeforeClass
+ public static void setup() {
+ try {
+ Configuration config = new Configuration();
+ config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+ config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
+ config.setInteger(QueryableStateOptions.CLIENT_NETWORK_THREADS, 1);
+ config.setBoolean(QueryableStateOptions.SERVER_ENABLE, true);
+ config.setInteger(QueryableStateOptions.SERVER_NETWORK_THREADS, 1);
+
+ cluster = new TestingCluster(config, false);
+ cluster.start(true);
+
+ testActorSystem = AkkaUtils.createDefaultActorSystem();
+
+ // verify that we are not in HA mode
+ Assert.assertTrue(cluster.haMode() == HighAvailabilityMode.NONE);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+
+ if (testActorSystem != null) {
+ testActorSystem.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
new file mode 100644
index 0000000..caa315a
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseFsBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link FsStateBackend}.
+ */
+public class NonHAQueryableStateITCaseFsBackend extends NonHAAbstractQueryableStateITCase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new FsStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
new file mode 100644
index 0000000..10e9b57
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateITCaseRocksDBBackend.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.itcases;
+
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
+ */
+public class NonHAQueryableStateITCaseRocksDBBackend extends NonHAAbstractQueryableStateITCase {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Override
+ protected AbstractStateBackend createStateBackend() throws Exception {
+ return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
new file mode 100644
index 0000000..d9a41a1
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/AkkaKvStateLocationLookupServiceTest.java
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.queryablestate.UnknownJobManager;
+import org.apache.flink.queryablestate.client.AkkaKvStateLocationLookupService;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link AkkaKvStateLocationLookupService}.
+ */
+public class AkkaKvStateLocationLookupServiceTest extends TestLogger {
+
+ /** The default timeout. */
+ private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ /** Test actor system shared between the tests. */
+ private static ActorSystem testActorSystem;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (testActorSystem != null) {
+ testActorSystem.shutdown();
+ }
+ }
+
+ /**
+ * Tests responses if no leader notification has been reported or leadership
+ * has been lost (leaderAddress = <code>null</code>).
+ */
+ @Test
+ public void testNoJobManagerRegistered() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+ null,
+ null);
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ //
+ // No leader registered initially => fail with UnknownJobManager
+ //
+ try {
+ JobID jobId = new JobID();
+ String name = "coffee";
+
+ Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(jobId, name);
+
+ Await.result(locationFuture, TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (UnknownJobManager ignored) {
+ // Expected
+ }
+
+ assertEquals("Received unexpected lookup", 0, received.size());
+
+ //
+ // Leader registration => communicate with new leader
+ //
+ UUID leaderSessionId = HighAvailabilityServices.DEFAULT_LEADER_ID;
+ KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "tea");
+
+ ActorRef testActor = LookupResponseActor.create(received, leaderSessionId, expected);
+
+ String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ // Notify the service about a leader
+ leaderRetrievalService.notifyListener(testActorAddress, leaderSessionId);
+
+ JobID jobId = new JobID();
+ String name = "tea";
+
+ // Verify that the leader response is handled
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, name), TIMEOUT);
+ assertEquals(expected, location);
+
+ // Verify that the correct message was sent to the leader
+ assertEquals(1, received.size());
+
+ verifyLookupMsg(received.poll(), jobId, name);
+
+ //
+ // Leader loss => fail with UnknownJobManager
+ //
+ leaderRetrievalService.notifyListener(null, null);
+
+ try {
+ Future<KvStateLocation> locationFuture = lookupService
+ .getKvStateLookupInfo(new JobID(), "coffee");
+
+ Await.result(locationFuture, TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (UnknownJobManager ignored) {
+ // Expected
+ }
+
+ // No new messages received
+ assertEquals(0, received.size());
+ }
+
+ /**
+ * Tests that messages are properly decorated with the leader session ID.
+ */
+ @Test
+ public void testLeaderSessionIdChange() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+ "localhost",
+ HighAvailabilityServices.DEFAULT_LEADER_ID);
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ // Create test actors with random leader session IDs
+ KvStateLocation expected1 = new KvStateLocation(new JobID(), new JobVertexID(), 8282, "salt");
+ UUID leaderSessionId1 = UUID.randomUUID();
+ ActorRef testActor1 = LookupResponseActor.create(received, leaderSessionId1, expected1);
+ String testActorAddress1 = AkkaUtils.getAkkaURL(testActorSystem, testActor1);
+
+ KvStateLocation expected2 = new KvStateLocation(new JobID(), new JobVertexID(), 22321, "pepper");
+ UUID leaderSessionId2 = UUID.randomUUID();
+ ActorRef testActor2 = LookupResponseActor.create(received, leaderSessionId1, expected2);
+ String testActorAddress2 = AkkaUtils.getAkkaURL(testActorSystem, testActor2);
+
+ JobID jobId = new JobID();
+
+ //
+ // Notify about first leader
+ //
+ leaderRetrievalService.notifyListener(testActorAddress1, leaderSessionId1);
+
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(jobId, "rock"), TIMEOUT);
+ assertEquals(expected1, location);
+
+ assertEquals(1, received.size());
+ verifyLookupMsg(received.poll(), jobId, "rock");
+
+ //
+ // Notify about second leader
+ //
+ leaderRetrievalService.notifyListener(testActorAddress2, leaderSessionId2);
+
+ location = Await.result(lookupService.getKvStateLookupInfo(jobId, "roll"), TIMEOUT);
+ assertEquals(expected2, location);
+
+ assertEquals(1, received.size());
+ verifyLookupMsg(received.poll(), jobId, "roll");
+ }
+
+ /**
+ * Tests that lookups are retried when no leader notification is available.
+ */
+ @Test
+ public void testRetryOnUnknownJobManager() throws Exception {
+ final Queue<AkkaKvStateLocationLookupService.LookupRetryStrategy> retryStrategies = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService.LookupRetryStrategyFactory retryStrategy =
+ new AkkaKvStateLocationLookupService.LookupRetryStrategyFactory() {
+ @Override
+ public AkkaKvStateLocationLookupService.LookupRetryStrategy createRetryStrategy() {
+ return retryStrategies.poll();
+ }
+ };
+
+ final TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+ null,
+ null);
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ retryStrategy);
+
+ lookupService.start();
+
+ //
+ // Test call to retry
+ //
+ final AtomicBoolean hasRetried = new AtomicBoolean();
+ retryStrategies.add(
+ new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+ @Override
+ public FiniteDuration getRetryDelay() {
+ return FiniteDuration.Zero();
+ }
+
+ @Override
+ public boolean tryRetry() {
+ if (hasRetried.compareAndSet(false, true)) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ Future<KvStateLocation> locationFuture = lookupService.getKvStateLookupInfo(new JobID(), "yessir");
+
+ Await.ready(locationFuture, TIMEOUT);
+ assertTrue("Did not retry ", hasRetried.get());
+
+ //
+ // Test leader notification after retry
+ //
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ KvStateLocation expected = new KvStateLocation(new JobID(), new JobVertexID(), 12122, "garlic");
+ ActorRef testActor = LookupResponseActor.create(received, null, expected);
+ final String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ retryStrategies.add(new AkkaKvStateLocationLookupService.LookupRetryStrategy() {
+ @Override
+ public FiniteDuration getRetryDelay() {
+ return FiniteDuration.apply(100, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean tryRetry() {
+ leaderRetrievalService.notifyListener(testActorAddress, HighAvailabilityServices.DEFAULT_LEADER_ID);
+ return true;
+ }
+ });
+
+ KvStateLocation location = Await.result(lookupService.getKvStateLookupInfo(new JobID(), "yessir"), TIMEOUT);
+ assertEquals(expected, location);
+ }
+
+ @Test
+ public void testUnexpectedResponseType() throws Exception {
+ TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService(
+ "localhost",
+ HighAvailabilityServices.DEFAULT_LEADER_ID);
+ Queue<LookupKvStateLocation> received = new LinkedBlockingQueue<>();
+
+ AkkaKvStateLocationLookupService lookupService = new AkkaKvStateLocationLookupService(
+ leaderRetrievalService,
+ testActorSystem,
+ TIMEOUT,
+ new AkkaKvStateLocationLookupService.DisabledLookupRetryStrategyFactory());
+
+ lookupService.start();
+
+ // Create test actors with random leader session IDs
+ String expected = "unexpected-response-type";
+ ActorRef testActor = LookupResponseActor.create(received, null, expected);
+ String testActorAddress = AkkaUtils.getAkkaURL(testActorSystem, testActor);
+
+ leaderRetrievalService.notifyListener(testActorAddress, null);
+
+ try {
+ Await.result(lookupService.getKvStateLookupInfo(new JobID(), "spicy"), TIMEOUT);
+ fail("Did not throw expected Exception");
+ } catch (Throwable ignored) {
+ // Expected
+ }
+ }
+
+ private static final class LookupResponseActor extends FlinkUntypedActor {
+
+ /** Received lookup messages. */
+ private final Queue<LookupKvStateLocation> receivedLookups;
+
+ /** Responses on KvStateMessage.LookupKvStateLocation messages. */
+ private final Queue<Object> lookupResponses;
+
+ /** The leader session ID. */
+ private UUID leaderSessionId;
+
+ public LookupResponseActor(
+ Queue<LookupKvStateLocation> receivedLookups,
+ UUID leaderSessionId, Object... lookupResponses) {
+
+ this.receivedLookups = Preconditions.checkNotNull(receivedLookups, "Received lookups");
+ this.leaderSessionId = leaderSessionId;
+ this.lookupResponses = new ArrayDeque<>();
+
+ if (lookupResponses != null) {
+ for (Object resp : lookupResponses) {
+ this.lookupResponses.add(resp);
+ }
+ }
+ }
+
+ @Override
+ public void handleMessage(Object message) throws Exception {
+ if (message instanceof LookupKvStateLocation) {
+ // Add to received lookups queue
+ receivedLookups.add((LookupKvStateLocation) message);
+
+ Object msg = lookupResponses.poll();
+ if (msg != null) {
+ if (msg instanceof Throwable) {
+ sender().tell(new Status.Failure((Throwable) msg), self());
+ } else {
+ sender().tell(new Status.Success(msg), self());
+ }
+ }
+ } else if (message instanceof UUID) {
+ this.leaderSessionId = (UUID) message;
+ } else {
+ LOG.debug("Received unhandled message: {}", message);
+ }
+ }
+
+ @Override
+ protected UUID getLeaderSessionID() {
+ return leaderSessionId;
+ }
+
+ private static ActorRef create(
+ Queue<LookupKvStateLocation> receivedLookups,
+ UUID leaderSessionId,
+ Object... lookupResponses) {
+
+ return testActorSystem.actorOf(Props.create(
+ LookupResponseActor.class,
+ receivedLookups,
+ leaderSessionId,
+ lookupResponses));
+ }
+ }
+
+ private static void verifyLookupMsg(
+ LookupKvStateLocation lookUpMsg,
+ JobID expectedJobId,
+ String expectedName) {
+
+ assertNotNull(lookUpMsg);
+ assertEquals(expectedJobId, lookUpMsg.getJobId());
+ assertEquals(expectedName, lookUpMsg.getRegistrationName());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
new file mode 100644
index 0000000..0b97bda
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientHandlerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.client.KvStateClientHandler;
+import org.apache.flink.queryablestate.client.KvStateClientHandlerCallback;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+
+import org.junit.Test;
+
+import java.nio.channels.ClosedChannelException;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for {@link KvStateClientHandler}.
+ */
+public class KvStateClientHandlerTest {
+
+ /**
+ * Tests that on reads the expected callback methods are called and read
+ * buffers are recycled.
+ */
+ @Test
+ public void testReadCallbacksAndBufferRecycling() throws Exception {
+ KvStateClientHandlerCallback callback = mock(KvStateClientHandlerCallback.class);
+
+ EmbeddedChannel channel = new EmbeddedChannel(new KvStateClientHandler(callback));
+
+ //
+ // Request success
+ //
+ ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+ channel.alloc(),
+ 1222112277,
+ new byte[0]);
+ buf.skipBytes(4); // skip frame length
+
+ // Verify callback
+ channel.writeInbound(buf);
+ verify(callback, times(1)).onRequestResult(eq(1222112277L), any(byte[].class));
+ assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+ //
+ // Request failure
+ //
+ buf = MessageSerializer.serializeKvStateRequestFailure(
+ channel.alloc(),
+ 1222112278,
+ new RuntimeException("Expected test Exception"));
+ buf.skipBytes(4); // skip frame length
+
+ // Verify callback
+ channel.writeInbound(buf);
+ verify(callback, times(1)).onRequestFailure(eq(1222112278L), any(RuntimeException.class));
+ assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+ //
+ // Server failure
+ //
+ buf = MessageSerializer.serializeServerFailure(
+ channel.alloc(),
+ new RuntimeException("Expected test Exception"));
+ buf.skipBytes(4); // skip frame length
+
+ // Verify callback
+ channel.writeInbound(buf);
+ verify(callback, times(1)).onFailure(any(RuntimeException.class));
+
+ //
+ // Unexpected messages
+ //
+ buf = channel.alloc().buffer(4).writeInt(1223823);
+
+ // Verify callback
+ channel.writeInbound(buf);
+ verify(callback, times(2)).onFailure(any(IllegalStateException.class));
+ assertEquals("Buffer not recycled", 0, buf.refCnt());
+
+ //
+ // Exception caught
+ //
+ channel.pipeline().fireExceptionCaught(new RuntimeException("Expected test Exception"));
+ verify(callback, times(3)).onFailure(any(RuntimeException.class));
+
+ //
+ // Channel inactive
+ //
+ channel.pipeline().fireChannelInactive();
+ verify(callback, times(4)).onFailure(any(ClosedChannelException.class));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
new file mode 100644
index 0000000..a2850b3
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateClientTest.java
@@ -0,0 +1,752 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.queryablestate.client.KvStateClient;
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.queryablestate.server.KvStateServerImpl;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.KvStateServer;
+import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.query.netty.AtomicKvStateRequestStats;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.NetUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
+import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link KvStateClient}.
+ */
+public class KvStateClientTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KvStateClientTest.class);
+
+ // Thread pool for client bootstrap (shared between tests)
+ private static final NioEventLoopGroup NIO_GROUP = new NioEventLoopGroup();
+
+ private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (NIO_GROUP != null) {
+ NIO_GROUP.shutdownGracefully();
+ }
+ }
+
+ /**
+ * Tests simple queries, of which half succeed and half fail.
+ */
+ @Test
+ public void testSimpleRequests() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateClient client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new KvStateClient(1, stats);
+
+ // Random result
+ final byte[] expected = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(expected);
+
+ final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.add((ByteBuf) msg);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ List<Future<byte[]>> futures = new ArrayList<>();
+
+ int numQueries = 1024;
+
+ for (int i = 0; i < numQueries; i++) {
+ futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+ }
+
+ // Respond to messages
+ Exception testException = new RuntimeException("Expected test Exception");
+
+ for (int i = 0; i < numQueries; i++) {
+ ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+
+ Channel ch = channel.get();
+ assertNotNull("Channel not active", ch);
+
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+
+ buf.release();
+
+ if (i % 2 == 0) {
+ ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
+ serverChannel.alloc(),
+ request.getRequestId(),
+ expected);
+
+ ch.writeAndFlush(response);
+ } else {
+ ByteBuf response = MessageSerializer.serializeKvStateRequestFailure(
+ serverChannel.alloc(),
+ request.getRequestId(),
+ testException);
+
+ ch.writeAndFlush(response);
+ }
+ }
+
+ for (int i = 0; i < numQueries; i++) {
+ if (i % 2 == 0) {
+ byte[] serializedResult = Await.result(futures.get(i), deadline.timeLeft());
+ assertArrayEquals(expected, serializedResult);
+ } else {
+ try {
+ Await.result(futures.get(i), deadline.timeLeft());
+ fail("Did not throw expected Exception");
+ } catch (RuntimeException ignored) {
+ // Expected
+ }
+ }
+ }
+
+ assertEquals(numQueries, stats.getNumRequests());
+ int expectedRequests = numQueries / 2;
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != expectedRequests ||
+ stats.getNumFailed() != expectedRequests)) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(expectedRequests, stats.getNumSuccessful());
+ assertEquals(expectedRequests, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a request to an unavailable host is failed with ConnectException.
+ */
+ @Test
+ public void testRequestUnavailableHost() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+ KvStateClient client = null;
+
+ try {
+ client = new KvStateClient(1, stats);
+
+ int availablePort = NetUtils.getAvailablePort();
+
+ KvStateServerAddress serverAddress = new KvStateServerAddress(
+ InetAddress.getLocalHost(),
+ availablePort);
+
+ Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+ try {
+ Await.result(future, deadline.timeLeft());
+ fail("Did not throw expected ConnectException");
+ } catch (ConnectException ignored) {
+ // Expected
+ }
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ assertEquals("Channel leak", 0, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Multiple threads concurrently fire queries.
+ */
+ @Test
+ public void testConcurrentQueries() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ ExecutorService executor = null;
+ KvStateClient client = null;
+ Channel serverChannel = null;
+
+ final byte[] serializedResult = new byte[1024];
+ ThreadLocalRandom.current().nextBytes(serializedResult);
+
+ try {
+ int numQueryTasks = 4;
+ final int numQueriesPerTask = 1024;
+
+ executor = Executors.newFixedThreadPool(numQueryTasks);
+
+ client = new KvStateClient(1, stats);
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ByteBuf buf = (ByteBuf) msg;
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+
+ buf.release();
+
+ ByteBuf response = MessageSerializer.serializeKvStateRequestResult(
+ ctx.alloc(),
+ request.getRequestId(),
+ serializedResult);
+
+ ctx.channel().writeAndFlush(response);
+ }
+ });
+
+ final KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ final KvStateClient finalClient = client;
+ Callable<List<Future<byte[]>>> queryTask = new Callable<List<Future<byte[]>>>() {
+ @Override
+ public List<Future<byte[]>> call() throws Exception {
+ List<Future<byte[]>> results = new ArrayList<>(numQueriesPerTask);
+
+ for (int i = 0; i < numQueriesPerTask; i++) {
+ results.add(finalClient.getKvState(
+ serverAddress,
+ new KvStateID(),
+ new byte[0]));
+ }
+
+ return results;
+ }
+ };
+
+ // Submit query tasks
+ List<java.util.concurrent.Future<List<Future<byte[]>>>> futures = new ArrayList<>();
+ for (int i = 0; i < numQueryTasks; i++) {
+ futures.add(executor.submit(queryTask));
+ }
+
+ // Verify results
+ for (java.util.concurrent.Future<List<Future<byte[]>>> future : futures) {
+ List<Future<byte[]>> results = future.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ for (Future<byte[]> result : results) {
+ byte[] actual = Await.result(result, deadline.timeLeft());
+ assertArrayEquals(serializedResult, actual);
+ }
+ }
+
+ int totalQueries = numQueryTasks * numQueriesPerTask;
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && stats.getNumSuccessful() != totalQueries) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(totalQueries, stats.getNumRequests());
+ assertEquals(totalQueries, stats.getNumSuccessful());
+ } finally {
+ if (executor != null) {
+ executor.shutdown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ if (client != null) {
+ client.shutDown();
+ }
+
+ assertEquals("Channel leak", 0, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a server failure closes the connection and removes it from
+ * the established connections.
+ */
+ @Test
+ public void testFailureClosesChannel() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateClient client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new KvStateClient(1, stats);
+
+ final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.add((ByteBuf) msg);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ // Requests
+ List<Future<byte[]>> futures = new ArrayList<>();
+ futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+ futures.add(client.getKvState(serverAddress, new KvStateID(), new byte[0]));
+
+ ByteBuf buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+ buf.release();
+
+ buf = received.poll(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull("Receive timed out", buf);
+ buf.release();
+
+ assertEquals(1, stats.getNumConnections());
+
+ Channel ch = channel.get();
+ assertNotNull("Channel not active", ch);
+
+ // Respond with failure
+ ch.writeAndFlush(MessageSerializer.serializeServerFailure(
+ serverChannel.alloc(),
+ new RuntimeException("Expected test server failure")));
+
+ try {
+ Await.result(futures.remove(0), deadline.timeLeft());
+ fail("Did not throw expected server failure");
+ } catch (RuntimeException ignored) {
+ // Expected
+ }
+
+ try {
+ Await.result(futures.remove(0), deadline.timeLeft());
+ fail("Did not throw expected server failure");
+ } catch (RuntimeException ignored) {
+ // Expected
+ }
+
+ assertEquals(0, stats.getNumConnections());
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
+ stats.getNumFailed() != 2)) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(2, stats.getNumRequests());
+ assertEquals(0, stats.getNumSuccessful());
+ assertEquals(2, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests that a server channel close, closes the connection and removes it
+ * from the established connections.
+ */
+ @Test
+ public void testServerClosesChannel() throws Exception {
+ Deadline deadline = TEST_TIMEOUT.fromNow();
+ AtomicKvStateRequestStats stats = new AtomicKvStateRequestStats();
+
+ KvStateClient client = null;
+ Channel serverChannel = null;
+
+ try {
+ client = new KvStateClient(1, stats);
+
+ final AtomicBoolean received = new AtomicBoolean();
+ final AtomicReference<Channel> channel = new AtomicReference<>();
+
+ serverChannel = createServerChannel(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ channel.set(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ received.set(true);
+ }
+ });
+
+ KvStateServerAddress serverAddress = getKvStateServerAddress(serverChannel);
+
+ // Requests
+ Future<byte[]> future = client.getKvState(serverAddress, new KvStateID(), new byte[0]);
+
+ while (!received.get() && deadline.hasTimeLeft()) {
+ Thread.sleep(50);
+ }
+ assertTrue("Receive timed out", received.get());
+
+ assertEquals(1, stats.getNumConnections());
+
+ channel.get().close().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+ try {
+ Await.result(future, deadline.timeLeft());
+ fail("Did not throw expected server failure");
+ } catch (ClosedChannelException ignored) {
+ // Expected
+ }
+
+ assertEquals(0, stats.getNumConnections());
+
+ // Counts can take some time to propagate
+ while (deadline.hasTimeLeft() && (stats.getNumSuccessful() != 0 ||
+ stats.getNumFailed() != 1)) {
+ Thread.sleep(100);
+ }
+
+ assertEquals(1, stats.getNumRequests());
+ assertEquals(0, stats.getNumSuccessful());
+ assertEquals(1, stats.getNumFailed());
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ if (serverChannel != null) {
+ serverChannel.close();
+ }
+
+ assertEquals("Channel leak", 0, stats.getNumConnections());
+ }
+ }
+
+ /**
+ * Tests multiple clients querying multiple servers until 100k queries have
+ * been processed. At this point, the client is shut down and its verified
+ * that all ongoing requests are failed.
+ */
+ @Test
+ public void testClientServerIntegration() throws Exception {
+ // Config
+ final int numServers = 2;
+ final int numServerEventLoopThreads = 2;
+ final int numServerQueryThreads = 2;
+
+ final int numClientEventLoopThreads = 4;
+ final int numClientsTasks = 8;
+
+ final int batchSize = 16;
+
+ final int numKeyGroups = 1;
+
+ AbstractStateBackend abstractBackend = new MemoryStateBackend();
+ KvStateRegistry dummyRegistry = new KvStateRegistry();
+ DummyEnvironment dummyEnv = new DummyEnvironment("test", 1, 0);
+ dummyEnv.setKvStateRegistry(dummyRegistry);
+
+ AbstractKeyedStateBackend<Integer> backend = abstractBackend.createKeyedStateBackend(
+ dummyEnv,
+ new JobID(),
+ "test_op",
+ IntSerializer.INSTANCE,
+ numKeyGroups,
+ new KeyGroupRange(0, 0),
+ dummyRegistry.createTaskRegistry(new JobID(), new JobVertexID()));
+
+ final FiniteDuration timeout = new FiniteDuration(10, TimeUnit.SECONDS);
+
+ AtomicKvStateRequestStats clientStats = new AtomicKvStateRequestStats();
+
+ KvStateClient client = null;
+ ExecutorService clientTaskExecutor = null;
+ final KvStateServer[] server = new KvStateServer[numServers];
+
+ try {
+ client = new KvStateClient(numClientEventLoopThreads, clientStats);
+ clientTaskExecutor = Executors.newFixedThreadPool(numClientsTasks);
+
+ // Create state
+ ValueStateDescriptor<Integer> desc = new ValueStateDescriptor<>("any", IntSerializer.INSTANCE);
+ desc.setQueryable("any");
+
+ // Create servers
+ KvStateRegistry[] registry = new KvStateRegistry[numServers];
+ AtomicKvStateRequestStats[] serverStats = new AtomicKvStateRequestStats[numServers];
+ final KvStateID[] ids = new KvStateID[numServers];
+
+ for (int i = 0; i < numServers; i++) {
+ registry[i] = new KvStateRegistry();
+ serverStats[i] = new AtomicKvStateRequestStats();
+ server[i] = new KvStateServerImpl(
+ InetAddress.getLocalHost(),
+ 0,
+ numServerEventLoopThreads,
+ numServerQueryThreads,
+ registry[i],
+ serverStats[i]);
+
+ server[i].start();
+
+ backend.setCurrentKey(1010 + i);
+
+ // Value per server
+ ValueState<Integer> state = backend.getPartitionedState(VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ desc);
+
+ state.update(201 + i);
+
+ // we know it must be a KvStat but this is not exposed to the user via State
+ InternalKvState<?> kvState = (InternalKvState<?>) state;
+
+ // Register KvState (one state instance for all server)
+ ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState);
+ }
+
+ final KvStateClient finalClient = client;
+ Callable<Void> queryTask = new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ while (true) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ // Random server permutation
+ List<Integer> random = new ArrayList<>();
+ for (int j = 0; j < batchSize; j++) {
+ random.add(j);
+ }
+ Collections.shuffle(random);
+
+ // Dispatch queries
+ List<Future<byte[]>> futures = new ArrayList<>(batchSize);
+
+ for (int j = 0; j < batchSize; j++) {
+ int targetServer = random.get(j) % numServers;
+
+ byte[] serializedKeyAndNamespace = KvStateSerializer.serializeKeyAndNamespace(
+ 1010 + targetServer,
+ IntSerializer.INSTANCE,
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE);
+
+ futures.add(finalClient.getKvState(
+ server[targetServer].getAddress(),
+ ids[targetServer],
+ serializedKeyAndNamespace));
+ }
+
+ // Verify results
+ for (int j = 0; j < batchSize; j++) {
+ int targetServer = random.get(j) % numServers;
+
+ Future<byte[]> future = futures.get(j);
+ byte[] buf = Await.result(future, timeout);
+ int value = KvStateSerializer.deserializeValue(buf, IntSerializer.INSTANCE);
+ assertEquals(201 + targetServer, value);
+ }
+ }
+ }
+ };
+
+ // Submit tasks
+ List<java.util.concurrent.Future<Void>> taskFutures = new ArrayList<>();
+ for (int i = 0; i < numClientsTasks; i++) {
+ taskFutures.add(clientTaskExecutor.submit(queryTask));
+ }
+
+ long numRequests;
+ while ((numRequests = clientStats.getNumRequests()) < 100_000) {
+ Thread.sleep(100);
+ LOG.info("Number of requests {}/100_000", numRequests);
+ }
+
+ // Shut down
+ client.shutDown();
+
+ for (java.util.concurrent.Future<Void> future : taskFutures) {
+ try {
+ future.get();
+ fail("Did not throw expected Exception after shut down");
+ } catch (ExecutionException t) {
+ if (t.getCause() instanceof ClosedChannelException ||
+ t.getCause() instanceof IllegalStateException) {
+ // Expected
+ } else {
+ t.printStackTrace();
+ fail("Failed with unexpected Exception type: " + t.getClass().getName());
+ }
+ }
+ }
+
+ assertEquals("Connection leak (client)", 0, clientStats.getNumConnections());
+ for (int i = 0; i < numServers; i++) {
+ boolean success = false;
+ int numRetries = 0;
+ while (!success) {
+ try {
+ assertEquals("Connection leak (server)", 0, serverStats[i].getNumConnections());
+ success = true;
+ } catch (Throwable t) {
+ if (numRetries < 10) {
+ LOG.info("Retrying connection leak check (server)");
+ Thread.sleep((numRetries + 1) * 50);
+ numRetries++;
+ } else {
+ throw t;
+ }
+ }
+ }
+ }
+ } finally {
+ if (client != null) {
+ client.shutDown();
+ }
+
+ for (int i = 0; i < numServers; i++) {
+ if (server[i] != null) {
+ server[i].shutDown();
+ }
+ }
+
+ if (clientTaskExecutor != null) {
+ clientTaskExecutor.shutdown();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ private Channel createServerChannel(final ChannelHandler... handlers) throws UnknownHostException, InterruptedException {
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ // Bind address and port
+ .localAddress(InetAddress.getLocalHost(), 0)
+ // NIO server channels
+ .group(NIO_GROUP)
+ .channel(NioServerSocketChannel.class)
+ // See initializer for pipeline details
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ch.pipeline()
+ .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4))
+ .addLast(handlers);
+ }
+ });
+
+ return bootstrap.bind().sync().channel();
+ }
+
+ private KvStateServerAddress getKvStateServerAddress(Channel serverChannel) {
+ InetSocketAddress localAddress = (InetSocketAddress) serverChannel.localAddress();
+
+ return new KvStateServerAddress(localAddress.getAddress(), localAddress.getPort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/29a6e995/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
new file mode 100644
index 0000000..f28ca68
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-java/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.queryablestate.messages.KvStateRequest;
+import org.apache.flink.queryablestate.messages.KvStateRequestFailure;
+import org.apache.flink.queryablestate.messages.KvStateRequestResult;
+import org.apache.flink.queryablestate.network.messages.MessageSerializer;
+import org.apache.flink.queryablestate.network.messages.MessageType;
+import org.apache.flink.runtime.query.KvStateID;
+import org.apache.flink.runtime.query.netty.message.KvStateSerializer;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link KvStateSerializer}.
+ */
+@RunWith(Parameterized.class)
+public class KvStateRequestSerializerTest {
+
+ private final ByteBufAllocator alloc = UnpooledByteBufAllocator.DEFAULT;
+
+ @Parameterized.Parameters
+ public static Collection<Boolean> parameters() {
+ return Arrays.asList(false, true);
+ }
+
+ @Parameterized.Parameter
+ public boolean async;
+
+ /**
+ * Tests KvState request serialization.
+ */
+ @Test
+ public void testKvStateRequestSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1337L;
+ KvStateID kvStateId = new KvStateID();
+ byte[] serializedKeyAndNamespace = randomByteArray(1024);
+
+ ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+ alloc,
+ requestId,
+ kvStateId,
+ serializedKeyAndNamespace);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+ assertEquals(kvStateId, request.getKvStateId());
+ assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests KvState request serialization with zero-length serialized key and namespace.
+ */
+ @Test
+ public void testKvStateRequestSerializationWithZeroLengthKeyAndNamespace() throws Exception {
+ byte[] serializedKeyAndNamespace = new byte[0];
+
+ ByteBuf buf = MessageSerializer.serializeKvStateRequest(
+ alloc,
+ 1823,
+ new KvStateID(),
+ serializedKeyAndNamespace);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf));
+ KvStateRequest request = MessageSerializer.deserializeKvStateRequest(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedKeyAndNamespace, request.getSerializedKeyAndNamespace());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> key and namespace.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedKeyAndNamepsace() throws Exception {
+ new KvStateRequest(0, new KvStateID(), null);
+ }
+
+ /**
+ * Tests KvState request result serialization.
+ */
+ @Test
+ public void testKvStateRequestResultSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 72727278L;
+ byte[] serializedResult = randomByteArray(1024);
+
+ ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+ alloc,
+ requestId,
+ serializedResult);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+
+ assertArrayEquals(serializedResult, request.getSerializedResult());
+ }
+
+ /**
+ * Tests KvState request result serialization with zero-length serialized result.
+ */
+ @Test
+ public void testKvStateRequestResultSerializationWithZeroLengthSerializedResult() throws Exception {
+ byte[] serializedResult = new byte[0];
+
+ ByteBuf buf = MessageSerializer.serializeKvStateRequestResult(
+ alloc,
+ 72727278,
+ serializedResult);
+
+ int frameLength = buf.readInt();
+
+ assertEquals(MessageType.REQUEST_RESULT, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestResult request = MessageSerializer.deserializeKvStateRequestResult(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertArrayEquals(serializedResult, request.getSerializedResult());
+ }
+
+ /**
+ * Tests that we don't try to be smart about <code>null</code> results.
+ * They should be treated explicitly.
+ */
+ @Test(expected = NullPointerException.class)
+ public void testNullPointerExceptionOnNullSerializedResult() throws Exception {
+ new KvStateRequestResult(0, null);
+ }
+
+ /**
+ * Tests KvState request failure serialization.
+ */
+ @Test
+ public void testKvStateRequestFailureSerialization() throws Exception {
+ long requestId = Integer.MAX_VALUE + 1111222L;
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = MessageSerializer.serializeKvStateRequestFailure(
+ alloc,
+ requestId,
+ cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.REQUEST_FAILURE, MessageSerializer.deserializeHeader(buf));
+ KvStateRequestFailure request = MessageSerializer.deserializeKvStateRequestFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(requestId, request.getRequestId());
+ assertEquals(cause.getClass(), request.getCause().getClass());
+ assertEquals(cause.getMessage(), request.getCause().getMessage());
+ }
+
+ /**
+ * Tests KvState server failure serialization.
+ */
+ @Test
+ public void testServerFailureSerialization() throws Exception {
+ IllegalStateException cause = new IllegalStateException("Expected test");
+
+ ByteBuf buf = MessageSerializer.serializeServerFailure(alloc, cause);
+
+ int frameLength = buf.readInt();
+ assertEquals(MessageType.SERVER_FAILURE, MessageSerializer.deserializeHeader(buf));
+ Throwable request = MessageSerializer.deserializeServerFailure(buf);
+ assertEquals(buf.readerIndex(), frameLength + 4);
+
+ assertEquals(cause.getClass(), request.getClass());
+ assertEquals(cause.getMessage(), request.getMessage());
+ }
+
+ private byte[] randomByteArray(int capacity) {
+ byte[] bytes = new byte[capacity];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ return bytes;
+ }
+}