You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/12 18:19:49 UTC
[2/3] flink git commit: [FLINK-9701] Introduce TTL configuration in
state descriptors
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
deleted file mode 100644
index 893f9ae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
-
-/** Test suite for {@link TtlListState}. */
-public class TtlListStateTest
- extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
- @Override
- void initTestValues() {
- updater = v -> ttlState.addAll(v);
- getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
- originalGetter = () -> ttlState.original.get();
-
- emptyValue = Collections.emptyList();
-
- updateEmpty = Arrays.asList(5, 7, 10);
- updateUnexpired = Arrays.asList(8, 9, 11);
- updateExpired = Arrays.asList(1, 4);
-
- getUpdateEmpty = updateEmpty;
- getUnexpired = updateUnexpired;
- getUpdateExpired = updateExpired;
- }
-
- @Override
- TtlListState<?, String, Integer> createState() {
- ListStateDescriptor<Integer> listStateDesc =
- new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
- return (TtlListState<?, String, Integer>) wrapMockState(listStateDesc);
- }
-
- @Override
- List<Integer> generateRandomUpdate() {
- int size = RANDOM.nextInt(5);
- return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
- }
-
- @Override
- Iterable<Integer> getMergeResult(
- List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
- List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
- List<Integer> result = new ArrayList<>();
- finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
new file mode 100644
index 0000000..c113bf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+class TtlListStateTestContext
+ extends TtlMergingStateTestContext<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+ @Override
+ void initTestValues() {
+ emptyValue = Collections.emptyList();
+
+ updateEmpty = Arrays.asList(5, 7, 10);
+ updateUnexpired = Arrays.asList(8, 9, 11);
+ updateExpired = Arrays.asList(1, 4);
+
+ getUpdateEmpty = updateEmpty;
+ getUnexpired = updateUnexpired;
+ getUpdateExpired = updateExpired;
+ }
+
+ @Override
+ void update(List<Integer> value) throws Exception {
+ ttlState.addAll(value);
+ }
+
+ @Override
+ Iterable<Integer> get() throws Exception {
+ return StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get() == null ? emptyValue : ttlState.original.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
+ }
+
+ @Override
+ List<Integer> generateRandomUpdate() {
+ int size = RANDOM.nextInt(5);
+ return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+ }
+
+ @Override
+ Iterable<Integer> getMergeResult(
+ List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+ List<Integer> result = new ArrayList<>();
+ finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
new file mode 100644
index 0000000..7fd61aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+class TtlMapStateAllEntriesTestContext extends
+ TtlMapStateTestContext<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+ @Override
+ void initTestValues() {
+ emptyValue = Collections.emptySet();
+
+ updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+ updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+ updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+ getUpdateEmpty = updateEmpty.entrySet();
+ getUnexpired = updateUnexpired.entrySet();
+ getUpdateExpired = updateExpired.entrySet();
+ }
+
+ @SafeVarargs
+ private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+ return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ }
+
+ @Override
+ void update(Map<Integer, String> map) throws Exception {
+ ttlState.putAll(map);
+ }
+
+ @Override
+ Set<Map.Entry<Integer, String>> get() throws Exception {
+ return StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.entries() == null ? Collections.emptySet() : ttlState.original.entries();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
deleted file mode 100644
index d6949e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-/** Test suite for per element methods of {@link TtlMapState}. */
-public class TtlMapStatePerElementTest extends TtlMapStateTestBase<String, String> {
- private static final int TEST_KEY = 1;
- private static final String TEST_VAL1 = "test value1";
- private static final String TEST_VAL2 = "test value2";
- private static final String TEST_VAL3 = "test value3";
-
- @Override
- void initTestValues() {
- updater = v -> ttlState.put(TEST_KEY, v);
- getter = () -> ttlState.get(TEST_KEY);
- originalGetter = () -> ttlState.original.get(TEST_KEY);
-
- updateEmpty = TEST_VAL1;
- updateUnexpired = TEST_VAL2;
- updateExpired = TEST_VAL3;
-
- getUpdateEmpty = TEST_VAL1;
- getUnexpired = TEST_VAL2;
- getUpdateExpired = TEST_VAL3;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
new file mode 100644
index 0000000..fb025af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, String> {
+ private static final int TEST_KEY = 1;
+ private static final String TEST_VAL1 = "test value1";
+ private static final String TEST_VAL2 = "test value2";
+ private static final String TEST_VAL3 = "test value3";
+
+ @Override
+ void initTestValues() {
+ updateEmpty = TEST_VAL1;
+ updateUnexpired = TEST_VAL2;
+ updateExpired = TEST_VAL3;
+
+ getUpdateEmpty = TEST_VAL1;
+ getUnexpired = TEST_VAL2;
+ getUpdateExpired = TEST_VAL3;
+ }
+
+ @Override
+ void update(String value) throws Exception {
+ ttlState.put(TEST_KEY, value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.get(TEST_KEY);
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get(TEST_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
deleted file mode 100644
index bac2f41..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/** Test suite for collection methods of {@link TtlMapState}. */
-public class TtlMapStateTest extends
- TtlMapStateTestBase<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
-
- @Override
- void initTestValues() {
- updater = map -> ttlState.putAll(map);
- getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
- originalGetter = () -> ttlState.original.entries();
-
- emptyValue = Collections.emptySet();
-
- updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
- updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
- updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
-
- getUpdateEmpty = updateEmpty.entrySet();
- getUnexpired = updateUnexpired.entrySet();
- getUpdateExpired = updateExpired.entrySet();
- }
-
- @SafeVarargs
- private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
- return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
deleted file mode 100644
index dab3194..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-abstract class TtlMapStateTestBase<UV, GV>
- extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, UV, GV> {
- @Override
- TtlMapState<?, String, Integer, String> createState() {
- MapStateDescriptor<Integer, String> mapStateDesc =
- new MapStateDescriptor<>("TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
- return (TtlMapState<?, String, Integer, String>) wrapMockState(mapStateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
new file mode 100644
index 0000000..bb7b327
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+abstract class TtlMapStateTestContext<UV, GV>
+ extends TtlStateTestContextBase<TtlMapState<?, String, Integer, String>, UV, GV> {
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new MapStateDescriptor<>(
+ "TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
deleted file mode 100644
index 6a7aebe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.internal.InternalMergingState;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
- extends TtlStateTestBase<S, UV, GV> {
- static final Random RANDOM = new Random();
-
- private static final List<String> NAMESPACES = Arrays.asList(
- "unsetNamespace1",
- "unsetNamespace2",
- "expiredNamespace",
- "expiredAndUpdatedNamespace",
- "unexpiredNamespace",
- "finalNamespace");
-
- @Test
- public void testMergeNamespaces() throws Exception {
- initTest();
-
- timeProvider.time = 0;
- List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
- applyStateUpdates(expiredUpdatesToMerge);
-
- timeProvider.time = 120;
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
- applyStateUpdates(unexpiredUpdatesToMerge);
-
- timeProvider.time = 150;
- List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
- applyStateUpdates(finalUpdatesToMerge);
-
- timeProvider.time = 230;
- ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
- ttlState.setCurrentNamespace("targetNamespace");
- assertEquals("Unexpected result of merge operation",
- getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
- }
-
- private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredNamespace", generateRandomUpdate()),
- Tuple2.of("expiredNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
- );
- }
-
- private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate())
- );
- }
-
- private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("finalNamespace", generateRandomUpdate()),
- Tuple2.of("finalNamespace", generateRandomUpdate())
- );
- }
-
- abstract UV generateRandomUpdate();
-
- private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
- for (Tuple2<String, UV> t : updates) {
- ttlState.setCurrentNamespace(t.f0);
- updater.accept(t.f1);
- }
- }
-
- abstract GV getMergeResult(
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
- List<Tuple2<String, UV>> finalUpdatesToMerge);
-
- @SuppressWarnings("unchecked")
- abstract static class TtlIntegerMergingStateBase<
- S extends InternalMergingState<?, String, ?, ?, GV>,
- UV extends Number, GV>
- extends TtlMergingStateBase<S, UV, GV> {
- @Override
- UV generateRandomUpdate() {
- return (UV) (Integer) RANDOM.nextInt(1000);
- }
-
- int getIntegerMergeResult(
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
- List<Tuple2<String, UV>> finalUpdatesToMerge) {
- return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
- finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
new file mode 100644
index 0000000..85c6134
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+abstract class TtlMergingStateTestContext<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+ extends TtlStateTestContextBase<S, UV, GV> {
+ static final Random RANDOM = new Random();
+
+ static final List<String> NAMESPACES = Arrays.asList(
+ "unsetNamespace1",
+ "unsetNamespace2",
+ "expiredNamespace",
+ "expiredAndUpdatedNamespace",
+ "unexpiredNamespace",
+ "finalNamespace");
+
+ List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+ );
+ }
+
+ List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+ );
+ }
+
+ List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate())
+ );
+ }
+
+ abstract UV generateRandomUpdate();
+
+ void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+ for (Tuple2<String, UV> t : updates) {
+ ttlState.setCurrentNamespace(t.f0);
+ update(t.f1);
+ }
+ }
+
+ abstract GV getMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+ @SuppressWarnings("unchecked")
+ abstract static class TtlIntegerMergingStateTestContext<
+ S extends InternalMergingState<?, String, ?, ?, GV>,
+ UV extends Number, GV>
+ extends TtlMergingStateTestContext<S, UV, GV> {
+ @Override
+ UV generateRandomUpdate() {
+ return (UV) (Integer) RANDOM.nextInt(1000);
+ }
+
+ int getIntegerMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge) {
+ return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+ finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
deleted file mode 100644
index bc5f67f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.List;
-
-/** Test suite for {@link TtlReducingState}. */
-public class TtlReducingStateTest
- extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
- @Override
- void initTestValues() {
- updater = v -> ttlState.add(v);
- getter = () -> ttlState.get();
- originalGetter = () -> ttlState.original.get();
-
- updateEmpty = 5;
- updateUnexpired = 7;
- updateExpired = 6;
-
- getUpdateEmpty = 5;
- getUnexpired = 12;
- getUpdateExpired = 6;
- }
-
- @Override
- TtlReducingState<?, String, Integer> createState() {
- ReducingStateDescriptor<Integer> aggregatingStateDes =
- new ReducingStateDescriptor<>("TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
- return (TtlReducingState<?, String, Integer>) wrapMockState(aggregatingStateDes);
- }
-
- @Override
- Integer getMergeResult(
- List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
- List<Tuple2<String, Integer>> finalUpdatesToMerge) {
- return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
- }
-
- private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
- if (v1 == null && v2 == null) {
- return null;
- } else if (v1 == null) {
- return v2;
- } else if (v2 == null) {
- return v1;
- } else {
- return v1 + v2;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
new file mode 100644
index 0000000..ea5e61b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+class TtlReducingStateTestContext
+ extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlReducingState<?, String, Integer>, Integer, Integer> {
+ @Override
+ void initTestValues() {
+ updateEmpty = 5;
+ updateUnexpired = 7;
+ updateExpired = 6;
+
+ getUpdateEmpty = 5;
+ getUnexpired = 12;
+ getUpdateExpired = 6;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ReducingStateDescriptor<>(
+ "TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
+ }
+
+ @Override
+ void update(Integer value) throws Exception {
+ ttlState.add(value);
+ }
+
+ @Override
+ Integer get() throws Exception {
+ return ttlState.get();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get();
+ }
+
+ @Override
+ Integer getMergeResult(
+ List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+ return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+ }
+
+ private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+ if (v1 == null && v2 == null) {
+ return null;
+ } else if (v1 == null) {
+ return v2;
+ } else if (v2 == null) {
+ return v1;
+ } else {
+ return v1 + v2;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index bc3d6e7..5820f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -22,105 +22,153 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.StateMigrationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
-abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+/** State TTL base test suite. */
+@RunWith(Parameterized.class)
+public abstract class TtlStateTestBase {
private static final long TTL = 100;
- private static final KeyedStateFactory MOCK_ORIGINAL_STATE_FACTORY = new MockKeyedStateFactory();
- S ttlState;
- MockTimeProvider timeProvider;
- StateTtlConfiguration ttlConfig;
+ private MockTtlTimeProvider timeProvider;
+ private StateBackendTestContext sbetc;
+ private StateTtlConfiguration ttlConfig;
- ThrowingConsumer<UV, Exception> updater;
- SupplierWithException<GV, Exception> getter;
- SupplierWithException<?, Exception> originalGetter;
+ @Before
+ public void setup() {
+ timeProvider = new MockTtlTimeProvider();
+ sbetc = createStateBackendTestContext(timeProvider);
+ }
- UV updateEmpty;
- UV updateUnexpired;
- UV updateExpired;
+ protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider);
+
+ @Parameterized.Parameter
+ public TtlStateTestContextBase<?, ?, ?> ctx;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
+ return Arrays.asList(
+ new TtlValueStateTestContext(),
+ new TtlListStateTestContext(),
+ new TtlMapStateAllEntriesTestContext(),
+ new TtlMapStatePerElementTestContext(),
+ new TtlAggregatingStateTestContext(),
+ new TtlReducingStateTestContext(),
+ new TtlFoldingStateTestContext());
+ }
- GV getUpdateEmpty;
- GV getUnexpired;
- GV getUpdateExpired;
+ @SuppressWarnings("unchecked")
+ private <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
+ return (TtlStateTestContextBase<S, UV, ?>) ctx;
+ }
- GV emptyValue = null;
+ @SuppressWarnings("unchecked")
+ private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
+ return (TtlMergingStateTestContext<?, UV, ?>) ctx;
+ }
- void initTest() {
+ private void initTest() throws Exception {
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
}
- private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
+ private void initTest(
+ StateTtlConfiguration.TtlUpdateType updateType,
+ StateTtlConfiguration.TtlStateVisibility visibility) throws Exception {
initTest(updateType, visibility, TTL);
}
- private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
- timeProvider = new MockTimeProvider();
- StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
- ttlConfigBuilder.setTtlUpdateType(updateType)
- .setStateVisibility(visibility)
- .setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
- .setTtl(Time.milliseconds(ttl));
- ttlConfig = ttlConfigBuilder.build();
- ttlState = createState();
- initTestValues();
+ private void initTest(
+ StateTtlConfiguration.TtlUpdateType updateType,
+ StateTtlConfiguration.TtlStateVisibility visibility,
+ long ttl) throws Exception {
+ ttlConfig = StateTtlConfiguration
+ .newBuilder(Time.milliseconds(ttl))
+ .setTtlUpdateType(updateType)
+ .setStateVisibility(visibility)
+ .build();
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(null);
+ createState();
+ ctx().initTestValues();
}
- abstract S createState();
-
- <SV, US extends State, IS extends US> IS wrapMockState(StateDescriptor<IS, SV> stateDesc) {
- try {
- return TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
- StringSerializer.INSTANCE, stateDesc,
- MOCK_ORIGINAL_STATE_FACTORY, ttlConfig, timeProvider);
- } catch (Exception e) {
- throw new FlinkRuntimeException("Unexpected exception wrapping mock state", e);
- }
+ @SuppressWarnings("unchecked")
+ private <S extends State> void createState() throws Exception {
+ StateDescriptor<S, Object> stateDescriptor = ctx().createStateDescriptor();
+ stateDescriptor.enableTimeToLive(ttlConfig);
+ ctx().ttlState =
+ (InternalKvState<?, String, ?>) sbetc.createState(stateDescriptor, "defaultNamespace");
}
- abstract void initTestValues();
+ private void takeAndRestoreSnapshot() throws Exception {
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(snapshot);
+ createState();
+ }
@Test
public void testNonExistentValue() throws Exception {
initTest();
- assertEquals("Non-existing state should be empty", emptyValue, getter.get());
+ assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get());
}
@Test
public void testExactExpirationOnWrite() throws Exception {
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
+ takeAndRestoreSnapshot();
+
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 20;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- updater.accept(updateUnexpired);
+ ctx().update(ctx().updateUnexpired);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Unexpired state should be available after update", getUnexpired, getter.get());
+ assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 170;
- updater.accept(updateExpired);
+ ctx().update(ctx().updateExpired);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 220;
- assertEquals("Unexpired state should be available after update", getUpdateExpired, getter.get());
+ assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 300;
- assertEquals("Expired state should be unavailable", emptyValue, getter.get());
- assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
}
@Test
@@ -128,11 +176,14 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
- assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+ assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
+ assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
}
@Test
@@ -140,17 +191,23 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Unexpired state should be available after read", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 250;
- assertEquals("Expired state should be unavailable", emptyValue, getter.get());
- assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
}
@Test
@@ -158,14 +215,18 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 170;
- assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
- assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+ assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+ assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
}
@Test
@@ -173,9 +234,154 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
timeProvider.time = 10;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 50;
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ }
+
+ @Test
+ public void testMergeNamespaces() throws Exception {
+ assumeThat(ctx, instanceOf(TtlMergingStateTestContext.class));
+
+ initTest();
+
+ timeProvider.time = 0;
+ List<Tuple2<String, Object>> expiredUpdatesToMerge = mctx().generateExpiredUpdatesToMerge();
+ mctx().applyStateUpdates(expiredUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 120;
+ List<Tuple2<String, Object>> unexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge();
+ mctx().applyStateUpdates(unexpiredUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 150;
+ List<Tuple2<String, Object>> finalUpdatesToMerge = mctx().generateFinalUpdatesToMerge();
+ mctx().applyStateUpdates(finalUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 230;
+ mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES);
+ mctx().ttlState.setCurrentNamespace("targetNamespace");
+ assertEquals("Unexpected result of merge operation",
+ mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), mctx().get());
+ }
+
+ @Test
+ public void testMultipleKeys() throws Exception {
+ testMultipleStateIds(id -> sbetc.setCurrentKey(id));
+ }
+
+ @Test
+ public void testMultipleNamespaces() throws Exception {
+ testMultipleStateIds(id -> ctx().ttlState.setCurrentNamespace(id));
+ }
+
+ private void testMultipleStateIds(Consumer<String> idChanger) throws Exception {
+ initTest();
+
+ timeProvider.time = 0;
+ idChanger.accept("id2");
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ idChanger.accept("id1");
+ ctx().update(ctx().updateEmpty);
+ idChanger.accept("id2");
+ ctx().update(ctx().updateUnexpired);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 120;
+ idChanger.accept("id1");
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 170;
+ idChanger.accept("id2");
+ ctx().update(ctx().updateExpired);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 230;
+ idChanger.accept("id1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 300;
+ idChanger.accept("id1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ }
+
+ @Test
+ public void testSnapshotChangeRestore() throws Exception {
+ initTest();
+
+ timeProvider.time = 0;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateEmpty);
+
+ timeProvider.time = 50;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateUnexpired);
+
+ timeProvider.time = 100;
+ sbetc.setCurrentKey("k2");
+ ctx().update(ctx().updateEmpty);
+
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+
+ timeProvider.time = 170;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateExpired);
+ sbetc.setCurrentKey("k2");
+ ctx().update(ctx().updateUnexpired);
+
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(snapshot);
+ createState();
+
+ timeProvider.time = 180;
+ sbetc.setCurrentKey("k1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ sbetc.setCurrentKey("k2");
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ }
+
+ @Test(expected = StateMigrationException.class)
+ public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
+ assumeThat(this, not(instanceOf(MockTtlStateTest.class)));
+
+ initTest();
+
+ timeProvider.time = 0;
+ ctx().update(ctx().updateEmpty);
+
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+ sbetc.createAndRestoreKeyedStateBackend();
+
+ sbetc.restoreSnapshot(snapshot);
+ sbetc.createState(ctx().createStateDescriptor(), "");
+ }
+
+ @After
+ public void tearDown() {
+ sbetc.disposeKeyedStateBackend();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
new file mode 100644
index 0000000..d40bfb0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+abstract class TtlStateTestContextBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+ S ttlState;
+
+ UV updateEmpty;
+ UV updateUnexpired;
+ UV updateExpired;
+
+ GV getUpdateEmpty;
+ GV getUnexpired;
+ GV getUpdateExpired;
+
+ GV emptyValue = null;
+
+ abstract void initTestValues();
+
+ abstract <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor();
+
+ abstract void update(UV value) throws Exception;
+
+ abstract GV get() throws Exception;
+
+ abstract Object getOriginal() throws Exception;
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
deleted file mode 100644
index 8d9a4b4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlValueState}. */
-public class TtlValueStateTest extends TtlStateTestBase<TtlValueState<?, String, String>, String, String> {
- private static final String TEST_VAL1 = "test value1";
- private static final String TEST_VAL2 = "test value2";
- private static final String TEST_VAL3 = "test value3";
-
- @Override
- void initTestValues() {
- updater = v -> ttlState.update(v);
- getter = () -> ttlState.value();
- originalGetter = () -> ttlState.original.value();
-
- updateEmpty = TEST_VAL1;
- updateUnexpired = TEST_VAL2;
- updateExpired = TEST_VAL3;
-
- getUpdateEmpty = TEST_VAL1;
- getUnexpired = TEST_VAL2;
- getUpdateExpired = TEST_VAL3;
- }
-
- @Override
- TtlValueState<?, String, String> createState() {
- ValueStateDescriptor<String> valueStateDesc =
- new ValueStateDescriptor<>("TtlValueTestState", StringSerializer.INSTANCE);
- return (TtlValueState<?, String, String>) wrapMockState(valueStateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
new file mode 100644
index 0000000..976d891
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlValueState}. */
+class TtlValueStateTestContext extends TtlStateTestContextBase<TtlValueState<?, String, String>, String, String> {
+ private static final String TEST_VAL1 = "test value1";
+ private static final String TEST_VAL2 = "test value2";
+ private static final String TEST_VAL3 = "test value3";
+
+ @Override
+ void initTestValues() {
+ updateEmpty = TEST_VAL1;
+ updateUnexpired = TEST_VAL2;
+ updateExpired = TEST_VAL3;
+
+ getUpdateEmpty = TEST_VAL1;
+ getUnexpired = TEST_VAL2;
+ getUpdateExpired = TEST_VAL3;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ValueStateDescriptor<>(
+ "TtlValueTestState", StringSerializer.INSTANCE);
+ }
+
+ @Override
+ void update(String value) throws Exception {
+ ttlState.update(value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.value();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.value();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
index 439ca7f2..3f94669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
@@ -21,14 +21,12 @@ package org.apache.flink.runtime.state.ttl.mock;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
/** In memory mock internal state base class. */
-class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
- private Map<N, T> namespacedValues = new HashMap<>();
- private T defaultNamespaceValue;
+abstract class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+ Supplier<Map<Object, Object>> values;
private N currentNamespace;
private final Supplier<T> emptyValue;
@@ -38,7 +36,6 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
MockInternalKvState(Supplier<T> emptyValue) {
this.emptyValue = emptyValue;
- defaultNamespaceValue = emptyValue.get();
}
@Override
@@ -72,26 +69,20 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
@Override
public void clear() {
- if (currentNamespace == null) {
- defaultNamespaceValue = emptyValue.get();
- } else {
- namespacedValues.remove(currentNamespace);
- }
+ getCurrentKeyValues().remove(currentNamespace);
}
+ @SuppressWarnings("unchecked")
public T getInternal() {
- T value = currentNamespace == null ? defaultNamespaceValue :
- namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
- updateInternal(value);
- return value;
+ return (T) getCurrentKeyValues().computeIfAbsent(currentNamespace, n -> emptyValue.get());
}
@SuppressWarnings("WeakerAccess")
public void updateInternal(T valueToStore) {
- if (currentNamespace == null) {
- defaultNamespaceValue = valueToStore;
- } else {
- namespacedValues.put(currentNamespace, valueToStore);
- }
+ getCurrentKeyValues().put(currentNamespace, valueToStore);
+ }
+
+ private Map<Object, Object> getCurrentKeyValues() {
+ return values.get();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 386ef97..9b5ac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -68,21 +68,17 @@ public class MockInternalMapState<K, N, UK, UV>
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
- return copy().entrySet();
- }
-
- private Map<UK, UV> copy() {
- return new HashMap<>(getInternal());
+ return getInternal().entrySet();
}
@Override
public Iterable<UK> keys() {
- return copy().keySet();
+ return getInternal().keySet();
}
@Override
public Iterable<UV> values() {
- return copy().values();
+ return getInternal().values();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
new file mode 100644
index 0000000..363ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** State backend which produces in memory mock state objects. */
+public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+ @SuppressWarnings("deprecation")
+ private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
+ Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
+ Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
+ Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
+ Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
+ Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private final Map<String, Map<K, Map<Object, Object>>> stateValues = new HashMap<>();
+
+ MockKeyedStateBackend(
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ ClassLoader userCodeClassLoader,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig,
+ TtlTimeProvider ttlTimeProvider) {
+ super(kvStateRegistry, keySerializer, userCodeClassLoader,
+ numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <N, SV, S extends State, IS extends S> IS createInternalState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), TtlStateFactory.class);
+ throw new FlinkRuntimeException(message);
+ }
+ IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
+ ((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
+ .computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
+ .computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
+ return state;
+ }
+
+ @Override
+ public int numStateEntries() {
+ int count = 0;
+ for (String state : stateValues.keySet()) {
+ for (K key : stateValues.get(state).keySet()) {
+ count += stateValues.get(state).get(key).size();
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // noop
+ }
+
+ @Override
+ public <N> Stream<K> getKeys(String state, N namespace) {
+ return stateValues.get(state).entrySet().stream()
+ .filter(e -> e.getValue().containsKey(namespace))
+ .map(Map.Entry::getKey);
+ }
+
+ @Override
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) {
+ return new FutureTask<>(() -> SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void restore(Collection<KeyedStateHandle> state) {
+ stateValues.clear();
+ state = state == null ? Collections.emptyList() : state;
+ state.forEach(ksh -> stateValues.putAll(copy(((MockKeyedStateHandle<K>) ksh).snapshotStates)));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K> Map<String, Map<K, Map<Object, Object>>> copy(
+ Map<String, Map<K, Map<Object, Object>>> stateValues) {
+ Map<String, Map<K, Map<Object, Object>>> snapshotStates = new HashMap<>();
+ for (String stateName : stateValues.keySet()) {
+ Map<K, Map<Object, Object>> keyedValues = snapshotStates.computeIfAbsent(stateName, s -> new HashMap<>());
+ for (K key : stateValues.get(stateName).keySet()) {
+ Map<Object, Object> values = keyedValues.computeIfAbsent(key, s -> new HashMap<>());
+ for (Object namespace : stateValues.get(stateName).get(key).keySet()) {
+ Object value = stateValues.get(stateName).get(key).get(namespace);
+ value = value instanceof List ? new ArrayList<>((List) value) : value;
+ value = value instanceof Map ? new HashMap<>((Map) value) : value;
+ values.put(namespace, value);
+ }
+ }
+ }
+ return snapshotStates;
+ }
+
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+ create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ return new HeapPriorityQueueSet<>(
+ elementPriorityComparator,
+ keyExtractor,
+ 0,
+ keyGroupRange,
+ 0);
+ }
+
+ private static class MockKeyedStateHandle<K> implements KeyedStateHandle {
+ private static final long serialVersionUID = 1L;
+
+ final Map<String, Map<K, Map<Object, Object>>> snapshotStates;
+
+ MockKeyedStateHandle(Map<String, Map<K, Map<Object, Object>>> snapshotStates) {
+ this.snapshotStates = snapshotStates;
+ }
+
+ @Override
+ public void discardState() {
+ snapshotStates.clear();
+ }
+
+ @Override
+ public long getStateSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
deleted file mode 100644
index d843352..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyedStateFactory;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** State factory which produces in memory mock state objects. */
-public class MockKeyedStateFactory implements KeyedStateFactory {
- @SuppressWarnings("deprecation")
- private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
- Stream.of(
- Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
- Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
- Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
- Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
- Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
- Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
- ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-
- @Override
- public <N, SV, S extends State, IS extends S> IS createState(
- TypeSerializer<N> namespaceSerializer,
- StateDescriptor<S, SV> stateDesc) throws Exception {
- KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
- if (stateFactory == null) {
- String message = String.format("State %s is not supported by %s",
- stateDesc.getClass(), TtlStateFactory.class);
- throw new FlinkRuntimeException(message);
- }
- return stateFactory.createState(namespaceSerializer, stateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
new file mode 100644
index 0000000..a8d49dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nullable;
+
+/** mack state backend. */
+public class MockStateBackend extends AbstractStateBackend {
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) {
+ return new CheckpointStorage() {
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return false;
+ }
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ return false;
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) {
+ return new MockKeyedStateBackend<>(
+ new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()),
+ keySerializer,
+ env.getUserClassLoader(),
+ numberOfKeyGroups,
+ keyGroupRange,
+ env.getExecutionConfig(),
+ ttlTimeProvider);
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6b4e3e9..3bf0aea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
@@ -270,10 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing,
LocalRecoveryConfig localRecoveryConfig,
- RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType
+ RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
+ TtlTimeProvider ttlTimeProvider
) throws IOException {
- super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+ super(kvStateRegistry, keySerializer, userCodeClassLoader,
+ numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
@@ -1347,7 +1350,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, SV, S extends State, IS extends S> IS createState(
+ public <N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 998521b..1794e17 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TernaryBoolean;
@@ -410,7 +411,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws IOException {
// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
@@ -442,7 +444,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
env.getExecutionConfig(),
isIncrementalCheckpointsEnabled(),
localRecoveryConfig,
- priorityQueueStateType);
+ priorityQueueStateType,
+ ttlTimeProvider);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 69069d6..6b254ce 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
@@ -241,7 +242,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
new ExecutionConfig(),
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
- RocksDBStateBackend.PriorityQueueStateType.HEAP);
+ RocksDBStateBackend.PriorityQueueStateType.HEAP,
+ TtlTimeProvider.DEFAULT);
verify(columnFamilyOptions, Mockito.times(1))
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);