You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/29 15:29:40 UTC
flink git commit: [FLINK-7880][QS] Fix QS test instabilities.
Repository: flink
Updated Branches:
refs/heads/master 5231c9300 -> 6b8f7dc2d
[FLINK-7880][QS] Fix QS test instabilities.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b8f7dc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b8f7dc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b8f7dc2
Branch: refs/heads/master
Commit: 6b8f7dc2d818cbe87bdfbe8852cfec5507f77a5a
Parents: 5231c93
Author: kkloudas <kk...@gmail.com>
Authored: Thu Oct 26 19:11:03 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Sun Oct 29 16:10:32 2017 +0100
----------------------------------------------------------------------
.../HAQueryableStateRocksDBBackendITCase.java | 2 -
.../KVStateRequestSerializerRocksDBTest.java | 167 ------------------
...NonHAQueryableStateRocksDBBackendITCase.java | 2 -
.../KVStateRequestSerializerRocksDBTest.java | 168 +++++++++++++++++++
4 files changed, 168 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
@Rule
http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
deleted file mode 100644
index cb6fb3d..0000000
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.contrib.streaming.state.PredefinedOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.queryablestate.client.VoidNamespace;
-import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.network.KvStateRequestSerializerTest;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.File;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Additional tests for the serialization and deserialization using
- * the KvStateSerializer with a RocksDB state back-end.
- */
-public final class KVStateRequestSerializerRocksDBTest {
-
- @Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
- /**
- * Extension of {@link RocksDBKeyedStateBackend} to make {@link
- * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
- * the tests.
- *
- * @param <K> key type
- */
- static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
- RocksDBKeyedStateBackend2(
- final String operatorIdentifier,
- final ClassLoader userCodeClassLoader,
- final File instanceBasePath,
- final DBOptions dbOptions,
- final ColumnFamilyOptions columnFamilyOptions,
- final TaskKvStateRegistry kvStateRegistry,
- final TypeSerializer<K> keySerializer,
- final int numberOfKeyGroups,
- final KeyGroupRange keyGroupRange,
- final ExecutionConfig executionConfig) throws Exception {
-
- super(operatorIdentifier, userCodeClassLoader,
- instanceBasePath,
- dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
- numberOfKeyGroups, keyGroupRange, executionConfig, false);
- }
-
- @Override
- public <N, T> InternalListState<N, T> createListState(
- final TypeSerializer<N> namespaceSerializer,
- final ListStateDescriptor<T> stateDesc) throws Exception {
-
- return super.createListState(namespaceSerializer, stateDesc);
- }
- }
-
- /**
- * Tests list serialization and deserialization match.
- *
- * @see KvStateRequestSerializerTest#testListSerialization()
- * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
- * test
- */
- @Test
- public void testListSerialization() throws Exception {
- final long key = 0L;
-
- // objects for RocksDB state list serialisation
- DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
- dbOptions.setCreateIfMissing(true);
- ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
- final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
- new RocksDBKeyedStateBackend2<>(
- "no-op",
- ClassLoader.getSystemClassLoader(),
- temporaryFolder.getRoot(),
- dbOptions,
- columnFamilyOptions,
- mock(TaskKvStateRegistry.class),
- LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0),
- new ExecutionConfig()
- );
- longHeapKeyedStateBackend.restore(null);
- longHeapKeyedStateBackend.setCurrentKey(key);
-
- final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
- .createListState(VoidNamespaceSerializer.INSTANCE,
- new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
-
- KvStateRequestSerializerTest.testListSerialization(key, listState);
- }
-
- /**
- * Tests map serialization and deserialization match.
- *
- * @see KvStateRequestSerializerTest#testMapSerialization()
- * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
- * test
- */
- @Test
- public void testMapSerialization() throws Exception {
- final long key = 0L;
-
- // objects for RocksDB state list serialisation
- DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
- dbOptions.setCreateIfMissing(true);
- ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
- final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
- new RocksDBKeyedStateBackend<>(
- "no-op",
- ClassLoader.getSystemClassLoader(),
- temporaryFolder.getRoot(),
- dbOptions,
- columnFamilyOptions,
- mock(TaskKvStateRegistry.class),
- LongSerializer.INSTANCE,
- 1, new KeyGroupRange(0, 0),
- new ExecutionConfig(),
- false);
- longHeapKeyedStateBackend.restore(null);
- longHeapKeyedStateBackend.setCurrentKey(key);
-
- final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
- longHeapKeyedStateBackend.getPartitionedState(
- VoidNamespace.INSTANCE,
- VoidNamespaceSerializer.INSTANCE,
- new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
-
- KvStateRequestSerializerTest.testMapSerialization(key, mapState);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.junit.BeforeClass;
-import org.junit.Ignore;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
/**
* Several integration tests for queryable state using the {@link RocksDBStateBackend}.
*/
-@Ignore
public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
@Rule
http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..07517ab
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.queryablestate.client.VoidNamespace;
+import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization using
+ * the KvStateSerializer with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ /**
+ * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+ * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+ * the tests.
+ *
+ * @param <K> key type
+ */
+ static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+ RocksDBKeyedStateBackend2(
+ final String operatorIdentifier,
+ final ClassLoader userCodeClassLoader,
+ final File instanceBasePath,
+ final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions,
+ final TaskKvStateRegistry kvStateRegistry,
+ final TypeSerializer<K> keySerializer,
+ final int numberOfKeyGroups,
+ final KeyGroupRange keyGroupRange,
+ final ExecutionConfig executionConfig) throws Exception {
+
+ super(operatorIdentifier, userCodeClassLoader,
+ instanceBasePath,
+ dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+ numberOfKeyGroups, keyGroupRange, executionConfig, false);
+ }
+
+ @Override
+ public <N, T> InternalListState<N, T> createListState(
+ final TypeSerializer<N> namespaceSerializer,
+ final ListStateDescriptor<T> stateDesc) throws Exception {
+
+ return super.createListState(namespaceSerializer, stateDesc);
+ }
+ }
+
+ /**
+ * Tests list serialization and deserialization match.
+ *
+ * @see KvStateRequestSerializerTest#testListSerialization()
+ * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+ * test
+ */
+ @Test
+ public void testListSerialization() throws Exception {
+ final long key = 0L;
+
+ // objects for RocksDB state list serialisation
+ DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+ final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend2<>(
+ "no-op",
+ ClassLoader.getSystemClassLoader(),
+ temporaryFolder.getRoot(),
+ dbOptions,
+ columnFamilyOptions,
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig()
+ );
+ longHeapKeyedStateBackend.restore(null);
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+ .createListState(VoidNamespaceSerializer.INSTANCE,
+ new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
+ KvStateRequestSerializerTest.testListSerialization(key, listState);
+ longHeapKeyedStateBackend.dispose();
+ }
+
+ /**
+ * Tests map serialization and deserialization match.
+ *
+ * @see KvStateRequestSerializerTest#testMapSerialization()
+ * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
+ * test
+ */
+ @Test
+ public void testMapSerialization() throws Exception {
+ final long key = 0L;
+
+ // objects for RocksDB state list serialisation
+ DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+ final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+ new RocksDBKeyedStateBackend<>(
+ "no-op",
+ ClassLoader.getSystemClassLoader(),
+ temporaryFolder.getRoot(),
+ dbOptions,
+ columnFamilyOptions,
+ mock(TaskKvStateRegistry.class),
+ LongSerializer.INSTANCE,
+ 1, new KeyGroupRange(0, 0),
+ new ExecutionConfig(),
+ false);
+ longHeapKeyedStateBackend.restore(null);
+ longHeapKeyedStateBackend.setCurrentKey(key);
+
+ final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
+ longHeapKeyedStateBackend.getPartitionedState(
+ VoidNamespace.INSTANCE,
+ VoidNamespaceSerializer.INSTANCE,
+ new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+ KvStateRequestSerializerTest.testMapSerialization(key, mapState);
+ longHeapKeyedStateBackend.dispose();
+ }
+}