You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/10/29 15:29:40 UTC

flink git commit: [FLINK-7880][QS] Fix QS test instabilities.

Repository: flink
Updated Branches:
  refs/heads/master 5231c9300 -> 6b8f7dc2d


[FLINK-7880][QS] Fix QS test instabilities.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b8f7dc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b8f7dc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b8f7dc2

Branch: refs/heads/master
Commit: 6b8f7dc2d818cbe87bdfbe8852cfec5507f77a5a
Parents: 5231c93
Author: kkloudas <kk...@gmail.com>
Authored: Thu Oct 26 19:11:03 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Sun Oct 29 16:10:32 2017 +0100

----------------------------------------------------------------------
 .../HAQueryableStateRocksDBBackendITCase.java   |   2 -
 .../KVStateRequestSerializerRocksDBTest.java    | 167 ------------------
 ...NonHAQueryableStateRocksDBBackendITCase.java |   2 -
 .../KVStateRequestSerializerRocksDBTest.java    | 168 +++++++++++++++++++
 4 files changed, 168 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
index 18b167f..cae02e2 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class HAQueryableStateRocksDBBackendITCase extends HAAbstractQueryableStateTestBase {
 
 	@Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
deleted file mode 100644
index cb6fb3d..0000000
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/KVStateRequestSerializerRocksDBTest.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.queryablestate.itcases;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.contrib.streaming.state.PredefinedOptions;
-import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
-import org.apache.flink.queryablestate.client.VoidNamespace;
-import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.network.KvStateRequestSerializerTest;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-
-import java.io.File;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Additional tests for the serialization and deserialization using
- * the KvStateSerializer with a RocksDB state back-end.
- */
-public final class KVStateRequestSerializerRocksDBTest {
-
-	@Rule
-	public TemporaryFolder temporaryFolder = new TemporaryFolder();
-
-	/**
-	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
-	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
-	 * the tests.
-	 *
-	 * @param <K> key type
-	 */
-	static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
-
-		RocksDBKeyedStateBackend2(
-				final String operatorIdentifier,
-				final ClassLoader userCodeClassLoader,
-				final File instanceBasePath,
-				final DBOptions dbOptions,
-				final ColumnFamilyOptions columnFamilyOptions,
-				final TaskKvStateRegistry kvStateRegistry,
-				final TypeSerializer<K> keySerializer,
-				final int numberOfKeyGroups,
-				final KeyGroupRange keyGroupRange,
-				final ExecutionConfig executionConfig) throws Exception {
-
-			super(operatorIdentifier, userCodeClassLoader,
-				instanceBasePath,
-				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
-				numberOfKeyGroups, keyGroupRange, executionConfig, false);
-		}
-
-		@Override
-		public <N, T> InternalListState<N, T> createListState(
-			final TypeSerializer<N> namespaceSerializer,
-			final ListStateDescriptor<T> stateDesc) throws Exception {
-
-			return super.createListState(namespaceSerializer, stateDesc);
-		}
-	}
-
-	/**
-	 * Tests list serialization and deserialization match.
-	 *
-	 * @see KvStateRequestSerializerTest#testListSerialization()
-	 * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
-	 * test
-	 */
-	@Test
-	public void testListSerialization() throws Exception {
-		final long key = 0L;
-
-		// objects for RocksDB state list serialisation
-		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
-		dbOptions.setCreateIfMissing(true);
-		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
-		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
-			new RocksDBKeyedStateBackend2<>(
-				"no-op",
-				ClassLoader.getSystemClassLoader(),
-				temporaryFolder.getRoot(),
-				dbOptions,
-				columnFamilyOptions,
-				mock(TaskKvStateRegistry.class),
-				LongSerializer.INSTANCE,
-				1, new KeyGroupRange(0, 0),
-				new ExecutionConfig()
-			);
-		longHeapKeyedStateBackend.restore(null);
-		longHeapKeyedStateBackend.setCurrentKey(key);
-
-		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
-			.createListState(VoidNamespaceSerializer.INSTANCE,
-				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
-
-		KvStateRequestSerializerTest.testListSerialization(key, listState);
-	}
-
-	/**
-	 * Tests map serialization and deserialization match.
-	 *
-	 * @see KvStateRequestSerializerTest#testMapSerialization()
-	 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
-	 * test
-	 */
-	@Test
-	public void testMapSerialization() throws Exception {
-		final long key = 0L;
-
-		// objects for RocksDB state list serialisation
-		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
-		dbOptions.setCreateIfMissing(true);
-		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
-		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
-			new RocksDBKeyedStateBackend<>(
-				"no-op",
-				ClassLoader.getSystemClassLoader(),
-				temporaryFolder.getRoot(),
-				dbOptions,
-				columnFamilyOptions,
-				mock(TaskKvStateRegistry.class),
-				LongSerializer.INSTANCE,
-				1, new KeyGroupRange(0, 0),
-				new ExecutionConfig(),
-				false);
-		longHeapKeyedStateBackend.restore(null);
-		longHeapKeyedStateBackend.setCurrentKey(key);
-
-		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
-				longHeapKeyedStateBackend.getPartitionedState(
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE,
-						new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
-
-		KvStateRequestSerializerTest.testMapSerialization(key, mapState);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
index 39fbe9e..7778a94 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/NonHAQueryableStateRocksDBBackendITCase.java
@@ -22,14 +22,12 @@ import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
 /**
  * Several integration tests for queryable state using the {@link RocksDBStateBackend}.
  */
-@Ignore
 public class NonHAQueryableStateRocksDBBackendITCase extends NonHAAbstractQueryableStateTestBase {
 
 	@Rule

http://git-wip-us.apache.org/repos/asf/flink/blob/6b8f7dc2/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
new file mode 100644
index 0000000..07517ab
--- /dev/null
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.queryablestate.network;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.contrib.streaming.state.PredefinedOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
+import org.apache.flink.queryablestate.client.VoidNamespace;
+import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.internal.InternalListState;
+import org.apache.flink.runtime.state.internal.InternalMapState;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Additional tests for the serialization and deserialization using
+ * the KvStateSerializer with a RocksDB state back-end.
+ */
+public final class KVStateRequestSerializerRocksDBTest {
+
+	@Rule
+	public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+	/**
+	 * Extension of {@link RocksDBKeyedStateBackend} to make {@link
+	 * #createListState(TypeSerializer, ListStateDescriptor)} public for use in
+	 * the tests.
+	 *
+	 * @param <K> key type
+	 */
+	static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {
+
+		RocksDBKeyedStateBackend2(
+				final String operatorIdentifier,
+				final ClassLoader userCodeClassLoader,
+				final File instanceBasePath,
+				final DBOptions dbOptions,
+				final ColumnFamilyOptions columnFamilyOptions,
+				final TaskKvStateRegistry kvStateRegistry,
+				final TypeSerializer<K> keySerializer,
+				final int numberOfKeyGroups,
+				final KeyGroupRange keyGroupRange,
+				final ExecutionConfig executionConfig) throws Exception {
+
+			super(operatorIdentifier, userCodeClassLoader,
+				instanceBasePath,
+				dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
+				numberOfKeyGroups, keyGroupRange, executionConfig, false);
+		}
+
+		@Override
+		public <N, T> InternalListState<N, T> createListState(
+			final TypeSerializer<N> namespaceSerializer,
+			final ListStateDescriptor<T> stateDesc) throws Exception {
+
+			return super.createListState(namespaceSerializer, stateDesc);
+		}
+	}
+
+	/**
+	 * Tests list serialization and deserialization match.
+	 *
+	 * @see KvStateRequestSerializerTest#testListSerialization()
+	 * KvStateRequestSerializerTest#testListSerialization() using the heap state back-end
+	 * test
+	 */
+	@Test
+	public void testListSerialization() throws Exception {
+		final long key = 0L;
+
+		// objects for RocksDB state list serialisation
+		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+		dbOptions.setCreateIfMissing(true);
+		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+		final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend2<>(
+				"no-op",
+				ClassLoader.getSystemClassLoader(),
+				temporaryFolder.getRoot(),
+				dbOptions,
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig()
+			);
+		longHeapKeyedStateBackend.restore(null);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
+			.createListState(VoidNamespaceSerializer.INSTANCE,
+				new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
+
+		KvStateRequestSerializerTest.testListSerialization(key, listState);
+		longHeapKeyedStateBackend.dispose();
+	}
+
+	/**
+	 * Tests map serialization and deserialization match.
+	 *
+	 * @see KvStateRequestSerializerTest#testMapSerialization()
+	 * KvStateRequestSerializerTest#testMapSerialization() using the heap state back-end
+	 * test
+	 */
+	@Test
+	public void testMapSerialization() throws Exception {
+		final long key = 0L;
+
+		// objects for RocksDB state list serialisation
+		DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
+		dbOptions.setCreateIfMissing(true);
+		ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
+		final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
+			new RocksDBKeyedStateBackend<>(
+				"no-op",
+				ClassLoader.getSystemClassLoader(),
+				temporaryFolder.getRoot(),
+				dbOptions,
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				LongSerializer.INSTANCE,
+				1, new KeyGroupRange(0, 0),
+				new ExecutionConfig(),
+				false);
+		longHeapKeyedStateBackend.restore(null);
+		longHeapKeyedStateBackend.setCurrentKey(key);
+
+		final InternalMapState<VoidNamespace, Long, String> mapState = (InternalMapState<VoidNamespace, Long, String>)
+				longHeapKeyedStateBackend.getPartitionedState(
+						VoidNamespace.INSTANCE,
+						VoidNamespaceSerializer.INSTANCE,
+						new MapStateDescriptor<>("test", LongSerializer.INSTANCE, StringSerializer.INSTANCE));
+
+		KvStateRequestSerializerTest.testMapSerialization(key, mapState);
+		longHeapKeyedStateBackend.dispose();
+	}
+}