You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sr...@apache.org on 2018/07/12 18:19:48 UTC
[1/3] flink git commit: [FLINK-9701] Introduce TTL configuration in
state descriptors
Repository: flink
Updated Branches:
refs/heads/master b407ba792 -> f45b7f7ff
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
new file mode 100644
index 0000000..a590828
--- /dev/null
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBTtlStateTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.StateBackendTestContext;
+import org.apache.flink.runtime.state.ttl.TtlStateTestBase;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.TernaryBoolean;
+
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+
+/** Test suite for rocksdb state TTL. */
+public class RocksDBTtlStateTest extends TtlStateTestBase {
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Override
+ protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+ return new StateBackendTestContext(timeProvider) {
+ @Override
+ protected StateBackend createStateBackend() {
+ return RocksDBTtlStateTest.this.createStateBackend();
+ }
+ };
+ }
+
+ private StateBackend createStateBackend() {
+ String dbPath;
+ String checkpointPath;
+ try {
+ dbPath = tempFolder.newFolder().getAbsolutePath();
+ checkpointPath = tempFolder.newFolder().toURI().toString();
+ } catch (IOException e) {
+ throw new FlinkRuntimeException("Failed to init rocksdb test state backend");
+ }
+ RocksDBStateBackend backend = new RocksDBStateBackend(new FsStateBackend(checkpointPath), TernaryBoolean.FALSE);
+ backend.setDbStoragePath(dbPath);
+ return backend;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index 594f337..ca9cb0b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CloseableIterable;
@@ -270,7 +271,8 @@ public class StreamTaskStateInitializerImpl implements StreamTaskStateInitialize
keySerializer,
taskInfo.getMaxNumberOfParallelSubtasks(),
keyGroupRange,
- environment.getTaskKvStateRegistry()),
+ environment.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT),
backendCloseableRegistry,
logDescription);
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 000cf31..6233c4c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImplTest;
import org.apache.flink.runtime.state.TestTaskLocalStateStore;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
@@ -138,7 +139,8 @@ public class StreamTaskStateInitializerImplTest {
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups, KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws Exception {
return mock(AbstractKeyedStateBackend.class);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index e5558f6..aebad54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -261,7 +262,8 @@ public class StreamTaskTerminationTest extends TestLogger {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
index 914326b..214fab5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSpyWrapperStateBackend.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.Preconditions;
import java.io.IOException;
@@ -53,7 +54,8 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws IOException {
return spy(delegate.createKeyedStateBackend(
env,
jobID,
@@ -61,7 +63,8 @@ public class TestSpyWrapperStateBackend extends AbstractStateBackend {
keySerializer,
numberOfKeyGroups,
keyGroupRange,
- kvStateRegistry));
+ kvStateRegistry,
+ ttlTimeProvider));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 46c408e..f3bae16 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
@@ -105,13 +106,14 @@ public class StateBackendITCase extends AbstractTestBase {
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
- Environment env,
- JobID jobID,
- String operatorIdentifier,
- TypeSerializer<K> keySerializer,
- int numberOfKeyGroups,
- KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException {
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws IOException {
throw new SuccessException();
}
[2/3] flink git commit: [FLINK-9701] Introduce TTL configuration in
state descriptors
Posted by sr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
deleted file mode 100644
index 893f9ae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
-
-/** Test suite for {@link TtlListState}. */
-public class TtlListStateTest
- extends TtlMergingStateBase<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
- @Override
- void initTestValues() {
- updater = v -> ttlState.addAll(v);
- getter = () -> StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
- originalGetter = () -> ttlState.original.get();
-
- emptyValue = Collections.emptyList();
-
- updateEmpty = Arrays.asList(5, 7, 10);
- updateUnexpired = Arrays.asList(8, 9, 11);
- updateExpired = Arrays.asList(1, 4);
-
- getUpdateEmpty = updateEmpty;
- getUnexpired = updateUnexpired;
- getUpdateExpired = updateExpired;
- }
-
- @Override
- TtlListState<?, String, Integer> createState() {
- ListStateDescriptor<Integer> listStateDesc =
- new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
- return (TtlListState<?, String, Integer>) wrapMockState(listStateDesc);
- }
-
- @Override
- List<Integer> generateRandomUpdate() {
- int size = RANDOM.nextInt(5);
- return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
- }
-
- @Override
- Iterable<Integer> getMergeResult(
- List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
- List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
- List<Integer> result = new ArrayList<>();
- finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
new file mode 100644
index 0000000..c113bf1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlListStateTestContext.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.StreamSupport;
+
+/** Test suite for {@link TtlListState}. */
+class TtlListStateTestContext
+ extends TtlMergingStateTestContext<TtlListState<?, String, Integer>, List<Integer>, Iterable<Integer>> {
+ @Override
+ void initTestValues() {
+ emptyValue = Collections.emptyList();
+
+ updateEmpty = Arrays.asList(5, 7, 10);
+ updateUnexpired = Arrays.asList(8, 9, 11);
+ updateExpired = Arrays.asList(1, 4);
+
+ getUpdateEmpty = updateEmpty;
+ getUnexpired = updateUnexpired;
+ getUpdateExpired = updateExpired;
+ }
+
+ @Override
+ void update(List<Integer> value) throws Exception {
+ ttlState.addAll(value);
+ }
+
+ @Override
+ Iterable<Integer> get() throws Exception {
+ return StreamSupport.stream(ttlState.get().spliterator(), false).collect(Collectors.toList());
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get() == null ? emptyValue : ttlState.original.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ListStateDescriptor<>("TtlTestListState", IntSerializer.INSTANCE);
+ }
+
+ @Override
+ List<Integer> generateRandomUpdate() {
+ int size = RANDOM.nextInt(5);
+ return IntStream.range(0, size).mapToObj(i -> RANDOM.nextInt(100)).collect(Collectors.toList());
+ }
+
+ @Override
+ Iterable<Integer> getMergeResult(
+ List<Tuple2<String, List<Integer>>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, List<Integer>>> finalUpdatesToMerge) {
+ List<Integer> result = new ArrayList<>();
+ finalUpdatesToMerge.forEach(t -> result.addAll(t.f1));
+ return result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
new file mode 100644
index 0000000..7fd61aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateAllEntriesTestContext.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/** Test suite for collection methods of {@link TtlMapState}. */
+class TtlMapStateAllEntriesTestContext extends
+ TtlMapStateTestContext<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
+
+ @Override
+ void initTestValues() {
+ emptyValue = Collections.emptySet();
+
+ updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
+ updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
+ updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
+
+ getUpdateEmpty = updateEmpty.entrySet();
+ getUnexpired = updateUnexpired.entrySet();
+ getUpdateExpired = updateExpired.entrySet();
+ }
+
+ @SafeVarargs
+ private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
+ return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+ }
+
+ @Override
+ void update(Map<Integer, String> map) throws Exception {
+ ttlState.putAll(map);
+ }
+
+ @Override
+ Set<Map.Entry<Integer, String>> get() throws Exception {
+ return StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.entries() == null ? Collections.emptySet() : ttlState.original.entries();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
deleted file mode 100644
index d6949e7..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-/** Test suite for per element methods of {@link TtlMapState}. */
-public class TtlMapStatePerElementTest extends TtlMapStateTestBase<String, String> {
- private static final int TEST_KEY = 1;
- private static final String TEST_VAL1 = "test value1";
- private static final String TEST_VAL2 = "test value2";
- private static final String TEST_VAL3 = "test value3";
-
- @Override
- void initTestValues() {
- updater = v -> ttlState.put(TEST_KEY, v);
- getter = () -> ttlState.get(TEST_KEY);
- originalGetter = () -> ttlState.original.get(TEST_KEY);
-
- updateEmpty = TEST_VAL1;
- updateUnexpired = TEST_VAL2;
- updateExpired = TEST_VAL3;
-
- getUpdateEmpty = TEST_VAL1;
- getUnexpired = TEST_VAL2;
- getUpdateExpired = TEST_VAL3;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
new file mode 100644
index 0000000..fb025af
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStatePerElementTestContext.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+/** Test suite for per element methods of {@link TtlMapState}. */
+class TtlMapStatePerElementTestContext extends TtlMapStateTestContext<String, String> {
+ private static final int TEST_KEY = 1;
+ private static final String TEST_VAL1 = "test value1";
+ private static final String TEST_VAL2 = "test value2";
+ private static final String TEST_VAL3 = "test value3";
+
+ @Override
+ void initTestValues() {
+ updateEmpty = TEST_VAL1;
+ updateUnexpired = TEST_VAL2;
+ updateExpired = TEST_VAL3;
+
+ getUpdateEmpty = TEST_VAL1;
+ getUnexpired = TEST_VAL2;
+ getUpdateExpired = TEST_VAL3;
+ }
+
+ @Override
+ void update(String value) throws Exception {
+ ttlState.put(TEST_KEY, value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.get(TEST_KEY);
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get(TEST_KEY);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
deleted file mode 100644
index bac2f41..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
-
-/** Test suite for collection methods of {@link TtlMapState}. */
-public class TtlMapStateTest extends
- TtlMapStateTestBase<Map<Integer, String>, Set<Map.Entry<Integer, String>>> {
-
- @Override
- void initTestValues() {
- updater = map -> ttlState.putAll(map);
- getter = () -> StreamSupport.stream(ttlState.entries().spliterator(), false).collect(Collectors.toSet());
- originalGetter = () -> ttlState.original.entries();
-
- emptyValue = Collections.emptySet();
-
- updateEmpty = mapOf(Tuple2.of(3, "3"), Tuple2.of(5, "5"), Tuple2.of(10, "10"));
- updateUnexpired = mapOf(Tuple2.of(12, "12"), Tuple2.of(7, "7"));
- updateExpired = mapOf(Tuple2.of(15, "15"), Tuple2.of(4, "4"));
-
- getUpdateEmpty = updateEmpty.entrySet();
- getUnexpired = updateUnexpired.entrySet();
- getUpdateExpired = updateExpired.entrySet();
- }
-
- @SafeVarargs
- private static <UK, UV> Map<UK, UV> mapOf(Tuple2<UK, UV> ... entries) {
- return Arrays.stream(entries).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
deleted file mode 100644
index dab3194..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestBase.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-abstract class TtlMapStateTestBase<UV, GV>
- extends TtlStateTestBase<TtlMapState<?, String, Integer, String>, UV, GV> {
- @Override
- TtlMapState<?, String, Integer, String> createState() {
- MapStateDescriptor<Integer, String> mapStateDesc =
- new MapStateDescriptor<>("TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
- return (TtlMapState<?, String, Integer, String>) wrapMockState(mapStateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
new file mode 100644
index 0000000..bb7b327
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMapStateTestContext.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+abstract class TtlMapStateTestContext<UV, GV>
+ extends TtlStateTestContextBase<TtlMapState<?, String, Integer, String>, UV, GV> {
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new MapStateDescriptor<>(
+ "TtlTestMapState", IntSerializer.INSTANCE, StringSerializer.INSTANCE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
deleted file mode 100644
index 6a7aebe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateBase.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.internal.InternalMergingState;
-
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Random;
-
-import static org.junit.Assert.assertEquals;
-
-abstract class TtlMergingStateBase<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
- extends TtlStateTestBase<S, UV, GV> {
- static final Random RANDOM = new Random();
-
- private static final List<String> NAMESPACES = Arrays.asList(
- "unsetNamespace1",
- "unsetNamespace2",
- "expiredNamespace",
- "expiredAndUpdatedNamespace",
- "unexpiredNamespace",
- "finalNamespace");
-
- @Test
- public void testMergeNamespaces() throws Exception {
- initTest();
-
- timeProvider.time = 0;
- List<Tuple2<String, UV>> expiredUpdatesToMerge = generateExpiredUpdatesToMerge();
- applyStateUpdates(expiredUpdatesToMerge);
-
- timeProvider.time = 120;
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge = generateUnexpiredUpdatesToMerge();
- applyStateUpdates(unexpiredUpdatesToMerge);
-
- timeProvider.time = 150;
- List<Tuple2<String, UV>> finalUpdatesToMerge = generateFinalUpdatesToMerge();
- applyStateUpdates(finalUpdatesToMerge);
-
- timeProvider.time = 230;
- ttlState.mergeNamespaces("targetNamespace", NAMESPACES);
- ttlState.setCurrentNamespace("targetNamespace");
- assertEquals("Unexpected result of merge operation",
- getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), getter.get());
- }
-
- private List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredNamespace", generateRandomUpdate()),
- Tuple2.of("expiredNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
- );
- }
-
- private List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate())
- );
- }
-
- private List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
- return Arrays.asList(
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
- Tuple2.of("finalNamespace", generateRandomUpdate()),
- Tuple2.of("finalNamespace", generateRandomUpdate())
- );
- }
-
- abstract UV generateRandomUpdate();
-
- private void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
- for (Tuple2<String, UV> t : updates) {
- ttlState.setCurrentNamespace(t.f0);
- updater.accept(t.f1);
- }
- }
-
- abstract GV getMergeResult(
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
- List<Tuple2<String, UV>> finalUpdatesToMerge);
-
- @SuppressWarnings("unchecked")
- abstract static class TtlIntegerMergingStateBase<
- S extends InternalMergingState<?, String, ?, ?, GV>,
- UV extends Number, GV>
- extends TtlMergingStateBase<S, UV, GV> {
- @Override
- UV generateRandomUpdate() {
- return (UV) (Integer) RANDOM.nextInt(1000);
- }
-
- int getIntegerMergeResult(
- List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
- List<Tuple2<String, UV>> finalUpdatesToMerge) {
- return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
- finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
new file mode 100644
index 0000000..85c6134
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlMergingStateTestContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.internal.InternalMergingState;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+abstract class TtlMergingStateTestContext<S extends InternalMergingState<?, String, ?, ?, GV>, UV, GV>
+ extends TtlStateTestContextBase<S, UV, GV> {
+ static final Random RANDOM = new Random();
+
+ static final List<String> NAMESPACES = Arrays.asList(
+ "unsetNamespace1",
+ "unsetNamespace2",
+ "expiredNamespace",
+ "expiredAndUpdatedNamespace",
+ "unexpiredNamespace",
+ "finalNamespace");
+
+ List<Tuple2<String, UV>> generateExpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate())
+ );
+ }
+
+ List<Tuple2<String, UV>> generateUnexpiredUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate())
+ );
+ }
+
+ List<Tuple2<String, UV>> generateFinalUpdatesToMerge() {
+ return Arrays.asList(
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("expiredAndUpdatedNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("unexpiredNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate()),
+ Tuple2.of("finalNamespace", generateRandomUpdate())
+ );
+ }
+
+ abstract UV generateRandomUpdate();
+
+ void applyStateUpdates(List<Tuple2<String, UV>> updates) throws Exception {
+ for (Tuple2<String, UV> t : updates) {
+ ttlState.setCurrentNamespace(t.f0);
+ update(t.f1);
+ }
+ }
+
+ abstract GV getMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge);
+
+ @SuppressWarnings("unchecked")
+ abstract static class TtlIntegerMergingStateTestContext<
+ S extends InternalMergingState<?, String, ?, ?, GV>,
+ UV extends Number, GV>
+ extends TtlMergingStateTestContext<S, UV, GV> {
+ @Override
+ UV generateRandomUpdate() {
+ return (UV) (Integer) RANDOM.nextInt(1000);
+ }
+
+ int getIntegerMergeResult(
+ List<Tuple2<String, UV>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, UV>> finalUpdatesToMerge) {
+ return unexpiredUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum() +
+ finalUpdatesToMerge.stream().mapToInt(t -> (Integer) t.f1).sum();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
deleted file mode 100644
index bc5f67f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.List;
-
-/** Test suite for {@link TtlReducingState}. */
-public class TtlReducingStateTest
- extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlReducingState<?, String, Integer>, Integer, Integer> {
- @Override
- void initTestValues() {
- updater = v -> ttlState.add(v);
- getter = () -> ttlState.get();
- originalGetter = () -> ttlState.original.get();
-
- updateEmpty = 5;
- updateUnexpired = 7;
- updateExpired = 6;
-
- getUpdateEmpty = 5;
- getUnexpired = 12;
- getUpdateExpired = 6;
- }
-
- @Override
- TtlReducingState<?, String, Integer> createState() {
- ReducingStateDescriptor<Integer> aggregatingStateDes =
- new ReducingStateDescriptor<>("TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
- return (TtlReducingState<?, String, Integer>) wrapMockState(aggregatingStateDes);
- }
-
- @Override
- Integer getMergeResult(
- List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
- List<Tuple2<String, Integer>> finalUpdatesToMerge) {
- return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
- }
-
- private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
- if (v1 == null && v2 == null) {
- return null;
- } else if (v1 == null) {
- return v2;
- } else if (v2 == null) {
- return v1;
- } else {
- return v1 + v2;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
new file mode 100644
index 0000000..ea5e61b
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlReducingStateTestContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.List;
+
+/** Test suite for {@link TtlReducingState}. */
+class TtlReducingStateTestContext
+ extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlReducingState<?, String, Integer>, Integer, Integer> {
+ @Override
+ void initTestValues() {
+ updateEmpty = 5;
+ updateUnexpired = 7;
+ updateExpired = 6;
+
+ getUpdateEmpty = 5;
+ getUnexpired = 12;
+ getUpdateExpired = 6;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ReducingStateDescriptor<>(
+ "TtlTestReducingState", REDUCE, IntSerializer.INSTANCE);
+ }
+
+ @Override
+ void update(Integer value) throws Exception {
+ ttlState.add(value);
+ }
+
+ @Override
+ Integer get() throws Exception {
+ return ttlState.get();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get();
+ }
+
+ @Override
+ Integer getMergeResult(
+ List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+ return getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge);
+ }
+
+ private static final ReduceFunction<Integer> REDUCE = (v1, v2) -> {
+ if (v1 == null && v2 == null) {
+ return null;
+ } else if (v1 == null) {
+ return v2;
+ } else if (v2 == null) {
+ return v1;
+ } else {
+ return v1 + v2;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
index bc3d6e7..5820f13 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestBase.java
@@ -22,105 +22,153 @@ import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfiguration;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
+import org.apache.flink.util.StateMigrationException;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Consumer;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeThat;
-abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+/** State TTL base test suite. */
+@RunWith(Parameterized.class)
+public abstract class TtlStateTestBase {
private static final long TTL = 100;
- private static final KeyedStateFactory MOCK_ORIGINAL_STATE_FACTORY = new MockKeyedStateFactory();
- S ttlState;
- MockTimeProvider timeProvider;
- StateTtlConfiguration ttlConfig;
+ private MockTtlTimeProvider timeProvider;
+ private StateBackendTestContext sbetc;
+ private StateTtlConfiguration ttlConfig;
- ThrowingConsumer<UV, Exception> updater;
- SupplierWithException<GV, Exception> getter;
- SupplierWithException<?, Exception> originalGetter;
+ @Before
+ public void setup() {
+ timeProvider = new MockTtlTimeProvider();
+ sbetc = createStateBackendTestContext(timeProvider);
+ }
- UV updateEmpty;
- UV updateUnexpired;
- UV updateExpired;
+ protected abstract StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider);
+
+ @Parameterized.Parameter
+ public TtlStateTestContextBase<?, ?, ?> ctx;
+
+ @Parameterized.Parameters(name = "{0}")
+ public static List<TtlStateTestContextBase<?, ?, ?>> testContexts() {
+ return Arrays.asList(
+ new TtlValueStateTestContext(),
+ new TtlListStateTestContext(),
+ new TtlMapStateAllEntriesTestContext(),
+ new TtlMapStatePerElementTestContext(),
+ new TtlAggregatingStateTestContext(),
+ new TtlReducingStateTestContext(),
+ new TtlFoldingStateTestContext());
+ }
- GV getUpdateEmpty;
- GV getUnexpired;
- GV getUpdateExpired;
+ @SuppressWarnings("unchecked")
+ private <S extends InternalKvState<?, String, ?>, UV> TtlStateTestContextBase<S, UV, ?> ctx() {
+ return (TtlStateTestContextBase<S, UV, ?>) ctx;
+ }
- GV emptyValue = null;
+ @SuppressWarnings("unchecked")
+ private <UV> TtlMergingStateTestContext<?, UV, ?> mctx() {
+ return (TtlMergingStateTestContext<?, UV, ?>) ctx;
+ }
- void initTest() {
+ private void initTest() throws Exception {
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
}
- private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility) {
+ private void initTest(
+ StateTtlConfiguration.TtlUpdateType updateType,
+ StateTtlConfiguration.TtlStateVisibility visibility) throws Exception {
initTest(updateType, visibility, TTL);
}
- private void initTest(StateTtlConfiguration.TtlUpdateType updateType, StateTtlConfiguration.TtlStateVisibility visibility, long ttl) {
- timeProvider = new MockTimeProvider();
- StateTtlConfiguration.Builder ttlConfigBuilder = StateTtlConfiguration.newBuilder(Time.seconds(5));
- ttlConfigBuilder.setTtlUpdateType(updateType)
- .setStateVisibility(visibility)
- .setTimeCharacteristic(StateTtlConfiguration.TtlTimeCharacteristic.ProcessingTime)
- .setTtl(Time.milliseconds(ttl));
- ttlConfig = ttlConfigBuilder.build();
- ttlState = createState();
- initTestValues();
+ private void initTest(
+ StateTtlConfiguration.TtlUpdateType updateType,
+ StateTtlConfiguration.TtlStateVisibility visibility,
+ long ttl) throws Exception {
+ ttlConfig = StateTtlConfiguration
+ .newBuilder(Time.milliseconds(ttl))
+ .setTtlUpdateType(updateType)
+ .setStateVisibility(visibility)
+ .build();
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(null);
+ createState();
+ ctx().initTestValues();
}
- abstract S createState();
-
- <SV, US extends State, IS extends US> IS wrapMockState(StateDescriptor<IS, SV> stateDesc) {
- try {
- return TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
- StringSerializer.INSTANCE, stateDesc,
- MOCK_ORIGINAL_STATE_FACTORY, ttlConfig, timeProvider);
- } catch (Exception e) {
- throw new FlinkRuntimeException("Unexpected exception wrapping mock state", e);
- }
+ @SuppressWarnings("unchecked")
+ private <S extends State> void createState() throws Exception {
+ StateDescriptor<S, Object> stateDescriptor = ctx().createStateDescriptor();
+ stateDescriptor.enableTimeToLive(ttlConfig);
+ ctx().ttlState =
+ (InternalKvState<?, String, ?>) sbetc.createState(stateDescriptor, "defaultNamespace");
}
- abstract void initTestValues();
+ private void takeAndRestoreSnapshot() throws Exception {
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(snapshot);
+ createState();
+ }
@Test
public void testNonExistentValue() throws Exception {
initTest();
- assertEquals("Non-existing state should be empty", emptyValue, getter.get());
+ assertEquals("Non-existing state should be empty", ctx().emptyValue, ctx().get());
}
@Test
public void testExactExpirationOnWrite() throws Exception {
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
+ takeAndRestoreSnapshot();
+
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 20;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- updater.accept(updateUnexpired);
+ ctx().update(ctx().updateUnexpired);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Unexpired state should be available after update", getUnexpired, getter.get());
+ assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 170;
- updater.accept(updateExpired);
+ ctx().update(ctx().updateExpired);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 220;
- assertEquals("Unexpired state should be available after update", getUpdateExpired, getter.get());
+ assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 300;
- assertEquals("Expired state should be unavailable", emptyValue, getter.get());
- assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
}
@Test
@@ -128,11 +176,14 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
- assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+ assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
+ assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
}
@Test
@@ -140,17 +191,23 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 120;
- assertEquals("Unexpired state should be available after read", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available after read", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 250;
- assertEquals("Expired state should be unavailable", emptyValue, getter.get());
- assertEquals("Original state should be cleared on access", emptyValue, originalGetter.get());
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ assertEquals("Original state should be cleared on access", ctx().emptyValue, ctx().getOriginal());
}
@Test
@@ -158,14 +215,18 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnReadAndWrite, StateTtlConfiguration.TtlStateVisibility.ReturnExpiredIfNotCleanedUp);
timeProvider.time = 0;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+
+ takeAndRestoreSnapshot();
timeProvider.time = 170;
- assertEquals("Expired state should be available", getUpdateEmpty, getter.get());
- assertEquals("Expired state should be cleared on access", emptyValue, getter.get());
+ assertEquals("Expired state should be available", ctx().getUpdateEmpty, ctx().get());
+ assertEquals("Expired state should be cleared on access", ctx().emptyValue, ctx().get());
}
@Test
@@ -173,9 +234,154 @@ abstract class TtlStateTestBase<S extends InternalKvState<?, String, ?>, UV, GV>
initTest(StateTtlConfiguration.TtlUpdateType.OnCreateAndWrite, StateTtlConfiguration.TtlStateVisibility.NeverReturnExpired, Long.MAX_VALUE);
timeProvider.time = 10;
- updater.accept(updateEmpty);
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 50;
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ }
+
+ @Test
+ public void testMergeNamespaces() throws Exception {
+ assumeThat(ctx, instanceOf(TtlMergingStateTestContext.class));
+
+ initTest();
+
+ timeProvider.time = 0;
+ List<Tuple2<String, Object>> expiredUpdatesToMerge = mctx().generateExpiredUpdatesToMerge();
+ mctx().applyStateUpdates(expiredUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 120;
+ List<Tuple2<String, Object>> unexpiredUpdatesToMerge = mctx().generateUnexpiredUpdatesToMerge();
+ mctx().applyStateUpdates(unexpiredUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 150;
+ List<Tuple2<String, Object>> finalUpdatesToMerge = mctx().generateFinalUpdatesToMerge();
+ mctx().applyStateUpdates(finalUpdatesToMerge);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 230;
+ mctx().ttlState.mergeNamespaces("targetNamespace", TtlMergingStateTestContext.NAMESPACES);
+ mctx().ttlState.setCurrentNamespace("targetNamespace");
+ assertEquals("Unexpected result of merge operation",
+ mctx().getMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge), mctx().get());
+ }
+
+ @Test
+ public void testMultipleKeys() throws Exception {
+ testMultipleStateIds(id -> sbetc.setCurrentKey(id));
+ }
+
+ @Test
+ public void testMultipleNamespaces() throws Exception {
+ testMultipleStateIds(id -> ctx().ttlState.setCurrentNamespace(id));
+ }
+
+ private void testMultipleStateIds(Consumer<String> idChanger) throws Exception {
+ initTest();
+
+ timeProvider.time = 0;
+ idChanger.accept("id2");
+ ctx().update(ctx().updateEmpty);
+
+ takeAndRestoreSnapshot();
timeProvider.time = 50;
- assertEquals("Unexpired state should be available", getUpdateEmpty, getter.get());
+ idChanger.accept("id1");
+ ctx().update(ctx().updateEmpty);
+ idChanger.accept("id2");
+ ctx().update(ctx().updateUnexpired);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 120;
+ idChanger.accept("id1");
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Unexpired state should be available after update", ctx().getUnexpired, ctx().get());
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 170;
+ idChanger.accept("id2");
+ ctx().update(ctx().updateExpired);
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 230;
+ idChanger.accept("id1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Unexpired state should be available after update", ctx().getUpdateExpired, ctx().get());
+
+ takeAndRestoreSnapshot();
+
+ timeProvider.time = 300;
+ idChanger.accept("id1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ idChanger.accept("id2");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ }
+
+ @Test
+ public void testSnapshotChangeRestore() throws Exception {
+ initTest();
+
+ timeProvider.time = 0;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateEmpty);
+
+ timeProvider.time = 50;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateUnexpired);
+
+ timeProvider.time = 100;
+ sbetc.setCurrentKey("k2");
+ ctx().update(ctx().updateEmpty);
+
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+
+ timeProvider.time = 170;
+ sbetc.setCurrentKey("k1");
+ ctx().update(ctx().updateExpired);
+ sbetc.setCurrentKey("k2");
+ ctx().update(ctx().updateUnexpired);
+
+ sbetc.createAndRestoreKeyedStateBackend();
+ sbetc.restoreSnapshot(snapshot);
+ createState();
+
+ timeProvider.time = 180;
+ sbetc.setCurrentKey("k1");
+ assertEquals("Expired state should be unavailable", ctx().emptyValue, ctx().get());
+ sbetc.setCurrentKey("k2");
+ assertEquals("Unexpired state should be available", ctx().getUpdateEmpty, ctx().get());
+ }
+
+ @Test(expected = StateMigrationException.class)
+ public void testRestoreTtlAndRegisterNonTtlStateCompatFailure() throws Exception {
+ assumeThat(this, not(instanceOf(MockTtlStateTest.class)));
+
+ initTest();
+
+ timeProvider.time = 0;
+ ctx().update(ctx().updateEmpty);
+
+ KeyedStateHandle snapshot = sbetc.takeSnapshot();
+ sbetc.createAndRestoreKeyedStateBackend();
+
+ sbetc.restoreSnapshot(snapshot);
+ sbetc.createState(ctx().createStateDescriptor(), "");
+ }
+
+ @After
+ public void tearDown() {
+ sbetc.disposeKeyedStateBackend();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
new file mode 100644
index 0000000..d40bfb0
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlStateTestContextBase.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+
+abstract class TtlStateTestContextBase<S extends InternalKvState<?, String, ?>, UV, GV> {
+ S ttlState;
+
+ UV updateEmpty;
+ UV updateUnexpired;
+ UV updateExpired;
+
+ GV getUpdateEmpty;
+ GV getUnexpired;
+ GV getUpdateExpired;
+
+ GV emptyValue = null;
+
+ abstract void initTestValues();
+
+ abstract <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor();
+
+ abstract void update(UV value) throws Exception;
+
+ abstract GV get() throws Exception;
+
+ abstract Object getOriginal() throws Exception;
+
+ @Override
+ public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
deleted file mode 100644
index 8d9a4b4..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlValueState}. */
-public class TtlValueStateTest extends TtlStateTestBase<TtlValueState<?, String, String>, String, String> {
- private static final String TEST_VAL1 = "test value1";
- private static final String TEST_VAL2 = "test value2";
- private static final String TEST_VAL3 = "test value3";
-
- @Override
- void initTestValues() {
- updater = v -> ttlState.update(v);
- getter = () -> ttlState.value();
- originalGetter = () -> ttlState.original.value();
-
- updateEmpty = TEST_VAL1;
- updateUnexpired = TEST_VAL2;
- updateExpired = TEST_VAL3;
-
- getUpdateEmpty = TEST_VAL1;
- getUnexpired = TEST_VAL2;
- getUpdateExpired = TEST_VAL3;
- }
-
- @Override
- TtlValueState<?, String, String> createState() {
- ValueStateDescriptor<String> valueStateDesc =
- new ValueStateDescriptor<>("TtlValueTestState", StringSerializer.INSTANCE);
- return (TtlValueState<?, String, String>) wrapMockState(valueStateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
new file mode 100644
index 0000000..976d891
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlValueStateTestContext.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlValueState}. */
+class TtlValueStateTestContext extends TtlStateTestContextBase<TtlValueState<?, String, String>, String, String> {
+ private static final String TEST_VAL1 = "test value1";
+ private static final String TEST_VAL2 = "test value2";
+ private static final String TEST_VAL3 = "test value3";
+
+ @Override
+ void initTestValues() {
+ updateEmpty = TEST_VAL1;
+ updateUnexpired = TEST_VAL2;
+ updateExpired = TEST_VAL3;
+
+ getUpdateEmpty = TEST_VAL1;
+ getUnexpired = TEST_VAL2;
+ getUpdateExpired = TEST_VAL3;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new ValueStateDescriptor<>(
+ "TtlValueTestState", StringSerializer.INSTANCE);
+ }
+
+ @Override
+ void update(String value) throws Exception {
+ ttlState.update(value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.value();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.value();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
index 439ca7f2..3f94669 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalKvState.java
@@ -21,14 +21,12 @@ package org.apache.flink.runtime.state.ttl.mock;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalKvState;
-import java.util.HashMap;
import java.util.Map;
import java.util.function.Supplier;
/** In memory mock internal state base class. */
-class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
- private Map<N, T> namespacedValues = new HashMap<>();
- private T defaultNamespaceValue;
+abstract class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
+ Supplier<Map<Object, Object>> values;
private N currentNamespace;
private final Supplier<T> emptyValue;
@@ -38,7 +36,6 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
MockInternalKvState(Supplier<T> emptyValue) {
this.emptyValue = emptyValue;
- defaultNamespaceValue = emptyValue.get();
}
@Override
@@ -72,26 +69,20 @@ class MockInternalKvState<K, N, T> implements InternalKvState<K, N, T> {
@Override
public void clear() {
- if (currentNamespace == null) {
- defaultNamespaceValue = emptyValue.get();
- } else {
- namespacedValues.remove(currentNamespace);
- }
+ getCurrentKeyValues().remove(currentNamespace);
}
+ @SuppressWarnings("unchecked")
public T getInternal() {
- T value = currentNamespace == null ? defaultNamespaceValue :
- namespacedValues.getOrDefault(currentNamespace, emptyValue.get());
- updateInternal(value);
- return value;
+ return (T) getCurrentKeyValues().computeIfAbsent(currentNamespace, n -> emptyValue.get());
}
@SuppressWarnings("WeakerAccess")
public void updateInternal(T valueToStore) {
- if (currentNamespace == null) {
- defaultNamespaceValue = valueToStore;
- } else {
- namespacedValues.put(currentNamespace, valueToStore);
- }
+ getCurrentKeyValues().put(currentNamespace, valueToStore);
+ }
+
+ private Map<Object, Object> getCurrentKeyValues() {
+ return values.get();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
index 386ef97..9b5ac10 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockInternalMapState.java
@@ -68,21 +68,17 @@ public class MockInternalMapState<K, N, UK, UV>
@Override
public Iterable<Map.Entry<UK, UV>> entries() {
- return copy().entrySet();
- }
-
- private Map<UK, UV> copy() {
- return new HashMap<>(getInternal());
+ return getInternal().entrySet();
}
@Override
public Iterable<UK> keys() {
- return copy().keySet();
+ return getInternal().keySet();
}
@Override
public Iterable<UV> values() {
- return copy().values();
+ return getInternal().values();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
new file mode 100644
index 0000000..363ecf8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateBackend.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.KeyExtractorFunction;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
+import org.apache.flink.runtime.state.KeyedStateFactory;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PriorityComparator;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
+import org.apache.flink.runtime.state.heap.HeapPriorityQueueSet;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.RunnableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/** State backend which produces in memory mock state objects. */
+public class MockKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
+
+ @SuppressWarnings("deprecation")
+ private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
+ Stream.of(
+ Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
+ Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
+ Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
+ Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
+ Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
+ Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
+ ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
+
+ private final Map<String, Map<K, Map<Object, Object>>> stateValues = new HashMap<>();
+
+ MockKeyedStateBackend(
+ TaskKvStateRegistry kvStateRegistry,
+ TypeSerializer<K> keySerializer,
+ ClassLoader userCodeClassLoader,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ ExecutionConfig executionConfig,
+ TtlTimeProvider ttlTimeProvider) {
+ super(kvStateRegistry, keySerializer, userCodeClassLoader,
+ numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <N, SV, S extends State, IS extends S> IS createInternalState(
+ TypeSerializer<N> namespaceSerializer,
+ StateDescriptor<S, SV> stateDesc) throws Exception {
+ KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
+ if (stateFactory == null) {
+ String message = String.format("State %s is not supported by %s",
+ stateDesc.getClass(), TtlStateFactory.class);
+ throw new FlinkRuntimeException(message);
+ }
+ IS state = stateFactory.createInternalState(namespaceSerializer, stateDesc);
+ ((MockInternalKvState<K, N, SV>) state).values = () -> stateValues
+ .computeIfAbsent(stateDesc.getName(), n -> new HashMap<>())
+ .computeIfAbsent(getCurrentKey(), k -> new HashMap<>());
+ return state;
+ }
+
+ @Override
+ public int numStateEntries() {
+ int count = 0;
+ for (String state : stateValues.keySet()) {
+ for (K key : stateValues.get(state).keySet()) {
+ count += stateValues.get(state).get(key).size();
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ // noop
+ }
+
+ @Override
+ public <N> Stream<K> getKeys(String state, N namespace) {
+ return stateValues.get(state).entrySet().stream()
+ .filter(e -> e.getValue().containsKey(namespace))
+ .map(Map.Entry::getKey);
+ }
+
+ @Override
+ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
+ long checkpointId,
+ long timestamp,
+ CheckpointStreamFactory streamFactory,
+ CheckpointOptions checkpointOptions) {
+ return new FutureTask<>(() -> SnapshotResult.of(new MockKeyedStateHandle<>(copy(stateValues))));
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void restore(Collection<KeyedStateHandle> state) {
+ stateValues.clear();
+ state = state == null ? Collections.emptyList() : state;
+ state.forEach(ksh -> stateValues.putAll(copy(((MockKeyedStateHandle<K>) ksh).snapshotStates)));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static <K> Map<String, Map<K, Map<Object, Object>>> copy(
+ Map<String, Map<K, Map<Object, Object>>> stateValues) {
+ Map<String, Map<K, Map<Object, Object>>> snapshotStates = new HashMap<>();
+ for (String stateName : stateValues.keySet()) {
+ Map<K, Map<Object, Object>> keyedValues = snapshotStates.computeIfAbsent(stateName, s -> new HashMap<>());
+ for (K key : stateValues.get(stateName).keySet()) {
+ Map<Object, Object> values = keyedValues.computeIfAbsent(key, s -> new HashMap<>());
+ for (Object namespace : stateValues.get(stateName).get(key).keySet()) {
+ Object value = stateValues.get(stateName).get(key).get(namespace);
+ value = value instanceof List ? new ArrayList<>((List) value) : value;
+ value = value instanceof Map ? new HashMap<>((Map) value) : value;
+ values.put(namespace, value);
+ }
+ }
+ }
+ return snapshotStates;
+ }
+
+ @Nonnull
+ @Override
+ public <T extends HeapPriorityQueueElement> KeyGroupedInternalPriorityQueue<T>
+ create(
+ @Nonnull String stateName,
+ @Nonnull TypeSerializer<T> byteOrderedElementSerializer,
+ @Nonnull PriorityComparator<T> elementPriorityComparator,
+ @Nonnull KeyExtractorFunction<T> keyExtractor) {
+ return new HeapPriorityQueueSet<>(
+ elementPriorityComparator,
+ keyExtractor,
+ 0,
+ keyGroupRange,
+ 0);
+ }
+
+ private static class MockKeyedStateHandle<K> implements KeyedStateHandle {
+ private static final long serialVersionUID = 1L;
+
+ final Map<String, Map<K, Map<Object, Object>>> snapshotStates;
+
+ MockKeyedStateHandle(Map<String, Map<K, Map<Object, Object>>> snapshotStates) {
+ this.snapshotStates = snapshotStates;
+ }
+
+ @Override
+ public void discardState() {
+ snapshotStates.clear();
+ }
+
+ @Override
+ public long getStateSize() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void registerSharedStates(SharedStateRegistry stateRegistry) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyGroupRange getKeyGroupRange() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
deleted file mode 100644
index d843352..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockKeyedStateFactory.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl.mock;
-
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.State;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.KeyedStateFactory;
-import org.apache.flink.runtime.state.ttl.TtlStateFactory;
-import org.apache.flink.util.FlinkRuntimeException;
-
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** State factory which produces in memory mock state objects. */
-public class MockKeyedStateFactory implements KeyedStateFactory {
- @SuppressWarnings("deprecation")
- private static final Map<Class<? extends StateDescriptor>, KeyedStateFactory> STATE_FACTORIES =
- Stream.of(
- Tuple2.of(ValueStateDescriptor.class, (KeyedStateFactory) MockInternalValueState::createState),
- Tuple2.of(ListStateDescriptor.class, (KeyedStateFactory) MockInternalListState::createState),
- Tuple2.of(MapStateDescriptor.class, (KeyedStateFactory) MockInternalMapState::createState),
- Tuple2.of(ReducingStateDescriptor.class, (KeyedStateFactory) MockInternalReducingState::createState),
- Tuple2.of(AggregatingStateDescriptor.class, (KeyedStateFactory) MockInternalAggregatingState::createState),
- Tuple2.of(FoldingStateDescriptor.class, (KeyedStateFactory) MockInternalFoldingState::createState)
- ).collect(Collectors.toMap(t -> t.f0, t -> t.f1));
-
- @Override
- public <N, SV, S extends State, IS extends S> IS createState(
- TypeSerializer<N> namespaceSerializer,
- StateDescriptor<S, SV> stateDesc) throws Exception {
- KeyedStateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
- if (stateFactory == null) {
- String message = String.format("State %s is not supported by %s",
- stateDesc.getClass(), TtlStateFactory.class);
- throw new FlinkRuntimeException(message);
- }
- return stateFactory.createState(namespaceSerializer, stateDesc);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
new file mode 100644
index 0000000..a8d49dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/mock/MockStateBackend.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl.mock;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
+
+import javax.annotation.Nullable;
+
+/** mack state backend. */
+public class MockStateBackend extends AbstractStateBackend {
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) {
+ return new CheckpointStorage() {
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ return false;
+ }
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ return false;
+ }
+
+ @Override
+ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPointer) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) {
+ return null;
+ }
+
+ @Override
+ public CheckpointStreamFactory.CheckpointStateOutputStream createTaskOwnedStateStream() {
+ return null;
+ }
+ };
+ }
+
+ @Override
+ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) {
+ return new MockKeyedStateBackend<>(
+ new KvStateRegistry().createTaskRegistry(jobID, new JobVertexID()),
+ keySerializer,
+ env.getUserClassLoader(),
+ numberOfKeyGroups,
+ keyGroupRange,
+ env.getExecutionConfig(),
+ ttlTimeProvider);
+ }
+
+ @Override
+ public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 6b4e3e9..3bf0aea 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -87,6 +87,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue;
import org.apache.flink.runtime.state.heap.TreeOrderedSetCache;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
@@ -270,10 +271,12 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing,
LocalRecoveryConfig localRecoveryConfig,
- RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType
+ RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
+ TtlTimeProvider ttlTimeProvider
) throws IOException {
- super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+ super(kvStateRegistry, keySerializer, userCodeClassLoader,
+ numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
@@ -1347,7 +1350,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, SV, S extends State, IS extends S> IS createState(
+ public <N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 998521b..1794e17 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.TernaryBoolean;
@@ -410,7 +411,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws IOException {
// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
@@ -442,7 +444,8 @@ public class RocksDBStateBackend extends AbstractStateBackend implements Configu
env.getExecutionConfig(),
isIncrementalCheckpointsEnabled(),
localRecoveryConfig,
- priorityQueueStateType);
+ priorityQueueStateType,
+ ttlTimeProvider);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 69069d6..6b254ce 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
@@ -241,7 +242,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
new ExecutionConfig(),
enableIncrementalCheckpointing,
TestLocalRecoveryConfig.disabled(),
- RocksDBStateBackend.PriorityQueueStateType.HEAP);
+ RocksDBStateBackend.PriorityQueueStateType.HEAP,
+ TtlTimeProvider.DEFAULT);
verify(columnFamilyOptions, Mockito.times(1))
.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
[3/3] flink git commit: [FLINK-9701] Introduce TTL configuration in
state descriptors
Posted by sr...@apache.org.
[FLINK-9701] Introduce TTL configuration in state descriptors
This closes #6313.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f45b7f7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f45b7f7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f45b7f7f
Branch: refs/heads/master
Commit: f45b7f7ff27df019e9045895e718fa112d12139c
Parents: b407ba7
Author: Andrey Zagrebin <az...@gmail.com>
Authored: Tue Jul 3 19:23:41 2018 +0200
Committer: Stefan Richter <s....@data-artisans.com>
Committed: Thu Jul 12 20:18:23 2018 +0200
----------------------------------------------------------------------
.../flink/api/common/state/StateDescriptor.java | 28 ++
.../api/common/state/StateTtlConfiguration.java | 3 -
.../common/typeutils/CompositeSerializer.java | 56 ++-
.../KVStateRequestSerializerRocksDBTest.java | 9 +-
.../network/KvStateRequestSerializerTest.java | 9 +-
.../network/KvStateServerHandlerTest.java | 4 +-
.../state/AbstractKeyedStateBackend.java | 47 ++-
.../runtime/state/AbstractStateBackend.java | 16 +-
.../flink/runtime/state/KeyedStateFactory.java | 2 +-
.../flink/runtime/state/StateBackend.java | 39 ++-
.../state/filesystem/FsStateBackend.java | 21 +-
.../state/heap/HeapKeyedStateBackend.java | 9 +-
.../state/memory/MemoryStateBackend.java | 7 +-
.../runtime/state/ttl/AbstractTtlDecorator.java | 2 -
.../flink/runtime/state/ttl/TtlMapState.java | 97 ++++--
.../runtime/state/ttl/TtlReducingState.java | 2 +-
.../runtime/state/ttl/TtlStateFactory.java | 22 +-
.../runtime/state/ttl/TtlTimeProvider.java | 4 +-
.../CheckpointSettingsSerializableTest.java | 4 +-
...HeapKeyedStateBackendAsyncByDefaultTest.java | 4 +-
.../runtime/state/StateBackendTestBase.java | 15 +-
.../state/StateSnapshotCompressionTest.java | 17 +-
...pKeyedStateBackendSnapshotMigrationTest.java | 4 +-
.../state/heap/HeapStateBackendTestBase.java | 4 +-
.../runtime/state/ttl/HeapTtlStateTest.java | 35 ++
.../runtime/state/ttl/MockTimeProvider.java | 28 --
.../runtime/state/ttl/MockTtlStateTest.java | 35 ++
.../runtime/state/ttl/MockTtlTimeProvider.java | 28 ++
.../state/ttl/StateBackendTestContext.java | 125 +++++++
.../state/ttl/TtlAggregatingStateTest.java | 90 -----
.../ttl/TtlAggregatingStateTestContext.java | 103 ++++++
.../runtime/state/ttl/TtlFoldingStateTest.java | 54 ---
.../state/ttl/TtlFoldingStateTestContext.java | 67 ++++
.../runtime/state/ttl/TtlListStateTest.java | 75 ----
.../state/ttl/TtlListStateTestContext.java | 87 +++++
.../ttl/TtlMapStateAllEntriesTestContext.java | 66 ++++
.../state/ttl/TtlMapStatePerElementTest.java | 42 ---
.../ttl/TtlMapStatePerElementTestContext.java | 53 +++
.../runtime/state/ttl/TtlMapStateTest.java | 55 ---
.../runtime/state/ttl/TtlMapStateTestBase.java | 33 --
.../state/ttl/TtlMapStateTestContext.java | 35 ++
.../runtime/state/ttl/TtlMergingStateBase.java | 126 -------
.../state/ttl/TtlMergingStateTestContext.java | 99 ++++++
.../runtime/state/ttl/TtlReducingStateTest.java | 71 ----
.../state/ttl/TtlReducingStateTestContext.java | 84 +++++
.../runtime/state/ttl/TtlStateTestBase.java | 340 +++++++++++++++----
.../state/ttl/TtlStateTestContextBase.java | 52 +++
.../runtime/state/ttl/TtlValueStateTest.java | 51 ---
.../state/ttl/TtlValueStateTestContext.java | 64 ++++
.../state/ttl/mock/MockInternalKvState.java | 29 +-
.../state/ttl/mock/MockInternalMapState.java | 10 +-
.../state/ttl/mock/MockKeyedStateBackend.java | 218 ++++++++++++
.../state/ttl/mock/MockKeyedStateFactory.java | 64 ----
.../state/ttl/mock/MockStateBackend.java | 111 ++++++
.../state/RocksDBKeyedStateBackend.java | 9 +-
.../streaming/state/RocksDBStateBackend.java | 7 +-
.../state/RocksDBStateBackendTest.java | 4 +-
.../streaming/state/RocksDBTtlStateTest.java | 62 ++++
.../StreamTaskStateInitializerImpl.java | 4 +-
.../StreamTaskStateInitializerImplTest.java | 4 +-
.../tasks/StreamTaskTerminationTest.java | 4 +-
.../tasks/TestSpyWrapperStateBackend.java | 7 +-
.../streaming/runtime/StateBackendITCase.java | 16 +-
63 files changed, 1933 insertions(+), 939 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
index 6c54e71..956fd05 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.common.state;
+import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -92,6 +93,10 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
@Nullable
private String queryableStateName;
+ /** Name for queries against state created from this StateDescriptor. */
+ @Nullable
+ private StateTtlConfiguration ttlConfig;
+
/** The default value returned by the state when no other value is bound to a key. */
@Nullable
protected transient T defaultValue;
@@ -203,6 +208,8 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
* @throws IllegalStateException If queryable state name already set
*/
public void setQueryable(String queryableStateName) {
+ Preconditions.checkArgument(ttlConfig == null,
+ "Queryable state is currently not supported with TTL");
if (this.queryableStateName == null) {
this.queryableStateName = Preconditions.checkNotNull(queryableStateName, "Registration name");
} else {
@@ -230,6 +237,27 @@ public abstract class StateDescriptor<S extends State, T> implements Serializabl
return queryableStateName != null;
}
+ /**
+ * Configures optional activation of state time-to-live (TTL).
+ *
+ * <p>State user value will expire, become unavailable and be cleaned up in storage
+ * depending on configured {@link StateTtlConfiguration}.
+ *
+ * @param ttlConfig configuration of state TTL
+ */
+ public void enableTimeToLive(StateTtlConfiguration ttlConfig) {
+ Preconditions.checkNotNull(ttlConfig);
+ Preconditions.checkArgument(queryableStateName == null,
+ "Queryable state is currently not supported with TTL");
+ this.ttlConfig = ttlConfig;
+ }
+
+ @Nullable
+ @Internal
+ public StateTtlConfiguration getTtlConfig() {
+ return ttlConfig;
+ }
+
// ------------------------------------------------------------------------
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
index 8ef2046..9bd8b15 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfiguration.java
@@ -27,15 +27,12 @@ import static org.apache.flink.api.common.state.StateTtlConfiguration.TtlUpdateT
/**
* Configuration of state TTL logic.
- * TODO: builder
*/
public class StateTtlConfiguration {
/**
* This option value configures when to update last access timestamp which prolongs state TTL.
*/
public enum TtlUpdateType {
- /** TTL is disabled. State does not expire. */
- Disabled,
/** Last access timestamp is initialised when state is created and updated on every write operation. */
OnCreateAndWrite,
/** The same as <code>OnCreateAndWrite</code> but also updated on read. */
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
index 43c5533..2db7a30 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/CompositeSerializer.java
@@ -50,7 +50,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
@SuppressWarnings("unchecked")
protected CompositeSerializer(boolean immutableTargetType, TypeSerializer<?> ... fieldSerializers) {
this(
- new PrecomputedParameters(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
+ PrecomputedParameters.precompute(immutableTargetType, (TypeSerializer<Object>[]) fieldSerializers),
fieldSerializers);
}
@@ -187,6 +187,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
return 31 * Boolean.hashCode(precomputed.immutableTargetType) + Arrays.hashCode(fieldSerializers);
}
+ @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
@Override
public boolean equals(Object obj) {
if (canEqual(obj)) {
@@ -205,17 +206,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
- return new CompositeTypeSerializerConfigSnapshot(fieldSerializers) {
- @Override
- public int getVersion() {
- return 0;
- }
- };
+ return new ConfigSnapshot(fieldSerializers);
}
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
- if (configSnapshot instanceof CompositeTypeSerializerConfigSnapshot) {
+ if (configSnapshot instanceof ConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> previousSerializersAndConfigs =
((CompositeTypeSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
if (previousSerializersAndConfigs.size() == fieldSerializers.length) {
@@ -242,11 +238,7 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
}
}
}
- PrecomputedParameters precomputed =
- new PrecomputedParameters(this.precomputed.immutableTargetType, convertSerializers);
- return requiresMigration ?
- CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers)) :
- CompatibilityResult.compatible();
+ return requiresMigration ? createMigrationCompatResult(convertSerializers) : CompatibilityResult.compatible();
}
private CompatibilityResult<Object> resolveFieldCompatibility(
@@ -256,6 +248,12 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
previousSerializersAndConfigs.get(index).f1, fieldSerializers[index]);
}
+ private CompatibilityResult<T> createMigrationCompatResult(TypeSerializer<Object>[] convertSerializers) {
+ PrecomputedParameters precomputed =
+ PrecomputedParameters.precompute(this.precomputed.immutableTargetType, convertSerializers);
+ return CompatibilityResult.requiresMigration(createSerializerInstance(precomputed, convertSerializers));
+ }
+
/** This class holds composite serializer parameters which can be precomputed in advanced for better performance. */
protected static class PrecomputedParameters implements Serializable {
private static final long serialVersionUID = 1L;
@@ -272,7 +270,14 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
/** Whether any field serializer is stateful. */
final boolean stateful;
- PrecomputedParameters(
+ private PrecomputedParameters(boolean immutableTargetType, boolean immutable, int length, boolean stateful) {
+ this.immutableTargetType = immutableTargetType;
+ this.immutable = immutable;
+ this.length = length;
+ this.stateful = stateful;
+ }
+
+ static PrecomputedParameters precompute(
boolean immutableTargetType,
TypeSerializer<Object>[] fieldSerializers) {
Preconditions.checkNotNull(fieldSerializers);
@@ -292,11 +297,26 @@ public abstract class CompositeSerializer<T> extends TypeSerializer<T> {
}
totalLength = totalLength >= 0 ? totalLength + fieldSerializer.getLength() : totalLength;
}
+ return new PrecomputedParameters(immutableTargetType, fieldsImmutable, totalLength, stateful);
+ }
+ }
- this.immutableTargetType = immutableTargetType;
- this.immutable = immutableTargetType && fieldsImmutable;
- this.length = totalLength;
- this.stateful = stateful;
+ /** Snapshot field serializers of composite type. */
+ public static class ConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
+ private static final int VERSION = 0;
+
+ /** This empty nullary constructor is required for deserializing the configuration. */
+ @SuppressWarnings("unused")
+ public ConfigSnapshot() {
+ }
+
+ ConfigSnapshot(@Nonnull TypeSerializer<?>... nestedSerializers) {
+ super(nestedSerializers);
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
index 9ea3198..176e55a 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KVStateRequestSerializerRocksDBTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.junit.Rule;
import org.junit.Test;
@@ -80,12 +81,13 @@ public final class KVStateRequestSerializerRocksDBTest {
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled(),
- RocksDBStateBackend.PriorityQueueStateType.HEAP
+ RocksDBStateBackend.PriorityQueueStateType.HEAP,
+ TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(VoidNamespaceSerializer.INSTANCE,
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
KvStateRequestSerializerTest.testListSerialization(key, listState);
@@ -121,7 +123,8 @@ public final class KVStateRequestSerializerRocksDBTest {
new ExecutionConfig(),
false,
TestLocalRecoveryConfig.disabled(),
- RocksDBStateBackend.PriorityQueueStateType.HEAP);
+ RocksDBStateBackend.PriorityQueueStateType.HEAP,
+ TtlTimeProvider.DEFAULT);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
index 73f8831..d539066 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateRequestSerializerTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -198,11 +199,12 @@ public class KvStateRequestSerializerTest {
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
- new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
+ new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
+ TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.setCurrentKey(key);
- final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createState(
+ final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
@@ -306,7 +308,8 @@ public class KvStateRequestSerializerTest {
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
- new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128)
+ new HeapPriorityQueueSetFactory(keyGroupRange, keyGroupRange.getNumberOfKeyGroups(), 128),
+ TtlTimeProvider.DEFAULT
);
longHeapKeyedStateBackend.setCurrentKey(key);
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 9947dac..adcf3ae 100644
--- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -52,6 +52,7 @@ import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TestLogger;
import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
@@ -761,6 +762,7 @@ public class KvStateServerHandlerTest extends TestLogger {
IntSerializer.INSTANCE,
numKeyGroups,
new KeyGroupRange(0, 0),
- registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()));
+ registry.createTaskRegistry(dummyEnv.getJobID(), dummyEnv.getJobVertexId()),
+ TtlTimeProvider.DEFAULT);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 8ce25b6..c7f1bd9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -27,6 +27,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.runtime.state.ttl.TtlStateFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
@@ -84,6 +86,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
private final ExecutionConfig executionConfig;
+ private final TtlTimeProvider ttlTimeProvider;
+
/** Decorates the input and output streams to write key-groups compressed. */
protected final StreamCompressionDecorator keyGroupCompressionDecorator;
@@ -93,7 +97,8 @@ public abstract class AbstractKeyedStateBackend<K> implements
ClassLoader userCodeClassLoader,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- ExecutionConfig executionConfig) {
+ ExecutionConfig executionConfig,
+ TtlTimeProvider ttlTimeProvider) {
this.kvStateRegistry = kvStateRegistry;
this.keySerializer = Preconditions.checkNotNull(keySerializer);
@@ -104,6 +109,7 @@ public abstract class AbstractKeyedStateBackend<K> implements
this.keyValueStatesByName = new HashMap<>();
this.executionConfig = executionConfig;
this.keyGroupCompressionDecorator = determineStreamCompression(executionConfig);
+ this.ttlTimeProvider = Preconditions.checkNotNull(ttlTimeProvider);
}
private StreamCompressionDecorator determineStreamCompression(ExecutionConfig executionConfig) {
@@ -220,40 +226,33 @@ public abstract class AbstractKeyedStateBackend<K> implements
public <N, S extends State, V> S getOrCreateKeyedState(
final TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, V> stateDescriptor) throws Exception {
-
checkNotNull(namespaceSerializer, "Namespace serializer");
+ checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
+ "This operation cannot use partitioned state.");
- if (keySerializer == null) {
- throw new UnsupportedOperationException(
- "State key serializer has not been configured in the config. " +
- "This operation cannot use partitioned state.");
- }
-
- if (!stateDescriptor.isSerializerInitialized()) {
- stateDescriptor.initializeSerializerUnlessSet(executionConfig);
- }
-
- InternalKvState<K, ?, ?> existing = keyValueStatesByName.get(stateDescriptor.getName());
- if (existing != null) {
- @SuppressWarnings("unchecked")
- S typedState = (S) existing;
- return typedState;
+ InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
+ if (kvState == null) {
+ if (!stateDescriptor.isSerializerInitialized()) {
+ stateDescriptor.initializeSerializerUnlessSet(executionConfig);
+ }
+ kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
+ namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
+ keyValueStatesByName.put(stateDescriptor.getName(), kvState);
+ publishQueryableStateIfEnabled(stateDescriptor, kvState);
}
+ return (S) kvState;
+ }
- InternalKvState<K, N, ?> kvState = createState(namespaceSerializer, stateDescriptor);
- keyValueStatesByName.put(stateDescriptor.getName(), kvState);
-
- // Publish queryable state
+ private void publishQueryableStateIfEnabled(
+ StateDescriptor<?, ?> stateDescriptor,
+ InternalKvState<?, ?, ?> kvState) {
if (stateDescriptor.isQueryable()) {
if (kvStateRegistry == null) {
throw new IllegalStateException("State backend has not been initialized for job.");
}
-
String name = stateDescriptor.getQueryableStateName();
kvStateRegistry.registerKvState(keyGroupRange, name, kvState);
}
-
- return (S) kvState;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 7e9c357..d397a88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import java.io.IOException;
@@ -42,13 +43,14 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
@Override
public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
- Environment env,
- JobID jobID,
- String operatorIdentifier,
- TypeSerializer<K> keySerializer,
- int numberOfKeyGroups,
- KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws IOException;
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws IOException;
@Override
public abstract OperatorStateBackend createOperatorStateBackend(
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
index dd251bd..de35979 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFactory.java
@@ -36,7 +36,7 @@ public interface KeyedStateFactory {
* @param <S> The type of the public API state.
* @param <IS> The type of internal state.
*/
- <N, SV, S extends State, IS extends S> IS createState(
+ <N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index f34cd9b..2775b71 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import java.io.IOException;
@@ -117,7 +118,7 @@ public interface StateBackend extends java.io.Serializable {
/**
* Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
- * and checkpointing it.
+ * and checkpointing it. Uses default TTL time provider.
*
* <p><i>Keyed State</i> is state where each value is bound to a key.
*
@@ -127,14 +128,46 @@ public interface StateBackend extends java.io.Serializable {
*
* @throws Exception This method may forward all exceptions that occur while instantiating the backend.
*/
- <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ default <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception;
+ TaskKvStateRegistry kvStateRegistry) throws Exception {
+ return createKeyedStateBackend(
+ env,
+ jobID,
+ operatorIdentifier,
+ keySerializer,
+ numberOfKeyGroups,
+ keyGroupRange,
+ kvStateRegistry,
+ TtlTimeProvider.DEFAULT);
+ }
+
+ /**
+ * Creates a new {@link AbstractKeyedStateBackend} that is responsible for holding <b>keyed state</b>
+ * and checkpointing it.
+ *
+ * <p><i>Keyed State</i> is state where each value is bound to a key.
+ *
+ * @param <K> The type of the keys by which the state is organized.
+ *
+ * @return The Keyed State Backend for the given job, operator, and key group range.
+ *
+ * @throws Exception This method may forward all exceptions that occur while instantiating the backend.
+ */
+ <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws Exception;
/**
* Creates a new {@link OperatorStateBackend} that can be used for storing operator state.
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index ad1581b..f5a86e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.LoggerFactory;
@@ -93,7 +94,7 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
private static final long serialVersionUID = -8191916350224044011L;
/** Maximum size of state that is stored with the metadata, rather than in files (1 MiByte). */
- public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+ private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
// ------------------------------------------------------------------------
@@ -448,13 +449,14 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
- Environment env,
- JobID jobID,
- String operatorIdentifier,
- TypeSerializer<K> keySerializer,
- int numberOfKeyGroups,
- KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) {
+ Environment env,
+ JobID jobID,
+ String operatorIdentifier,
+ TypeSerializer<K> keySerializer,
+ int numberOfKeyGroups,
+ KeyGroupRange keyGroupRange,
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) {
TaskStateManager taskStateManager = env.getTaskStateManager();
LocalRecoveryConfig localRecoveryConfig = taskStateManager.createLocalRecoveryConfig();
@@ -470,7 +472,8 @@ public class FsStateBackend extends AbstractFileStateBackend implements Configur
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
localRecoveryConfig,
- priorityQueueSetFactory);
+ priorityQueueSetFactory,
+ ttlTimeProvider);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 562c93d..495dfe0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -63,6 +63,7 @@ import org.apache.flink.runtime.state.StateSnapshot;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
@@ -171,9 +172,11 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
boolean asynchronousSnapshots,
ExecutionConfig executionConfig,
LocalRecoveryConfig localRecoveryConfig,
- PriorityQueueSetFactory priorityQueueSetFactory) {
+ PriorityQueueSetFactory priorityQueueSetFactory,
+ TtlTimeProvider ttlTimeProvider) {
- super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
+ super(kvStateRegistry, keySerializer, userCodeClassLoader,
+ numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider);
this.localRecoveryConfig = Preconditions.checkNotNull(localRecoveryConfig);
SnapshotStrategySynchronicityBehavior<K> synchronicityTrait = asynchronousSnapshots ?
@@ -241,7 +244,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
}
@Override
- public <N, SV, S extends State, IS extends S> IS createState(
+ public <N, SV, S extends State, IS extends S> IS createInternalState(
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc) throws Exception {
StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index d78944c..1c464d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filesystem.AbstractFileStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TernaryBoolean;
import javax.annotation.Nullable;
@@ -307,7 +308,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) {
TaskStateManager taskStateManager = env.getTaskStateManager();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
@@ -321,7 +323,8 @@ public class MemoryStateBackend extends AbstractFileStateBackend implements Conf
isUsingAsynchronousSnapshots(),
env.getExecutionConfig(),
taskStateManager.createLocalRecoveryConfig(),
- priorityQueueSetFactory);
+ priorityQueueSetFactory,
+ ttlTimeProvider);
}
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
index 29a575a..1b72c54 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/AbstractTtlDecorator.java
@@ -55,8 +55,6 @@ abstract class AbstractTtlDecorator<T> {
Preconditions.checkNotNull(original);
Preconditions.checkNotNull(config);
Preconditions.checkNotNull(timeProvider);
- Preconditions.checkArgument(config.getTtlUpdateType() != StateTtlConfiguration.TtlUpdateType.Disabled,
- "State does not need to be wrapped with TTL if it is configured as disabled.");
this.original = original;
this.config = config;
this.timeProvider = timeProvider;
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index 98d7c52..21145e5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -23,13 +23,15 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.util.FlinkRuntimeException;
+import javax.annotation.Nonnull;
+
import java.util.AbstractMap;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
/**
* This class wraps map state with TTL logic.
@@ -84,52 +86,89 @@ class TtlMapState<K, N, UK, UV>
@Override
public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
- return entriesStream()::iterator;
+ return entries(e -> e);
}
- private Stream<Map.Entry<UK, UV>> entriesStream() throws Exception {
+ private <R> Iterable<R> entries(
+ Function<Map.Entry<UK, UV>, R> resultMapper) throws Exception {
Iterable<Map.Entry<UK, TtlValue<UV>>> withTs = original.entries();
- withTs = withTs == null ? Collections.emptyList() : withTs;
- return StreamSupport
- .stream(withTs.spliterator(), false)
- .filter(this::unexpiredAndUpdateOrCleanup)
- .map(TtlMapState::unwrapWithoutTs);
- }
-
- private boolean unexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
- UV unexpiredValue;
- try {
- unexpiredValue = getWithTtlCheckAndUpdate(
- e::getValue,
- v -> original.put(e.getKey(), v),
- () -> original.remove(e.getKey()));
- } catch (Exception ex) {
- throw new FlinkRuntimeException(ex);
- }
- return unexpiredValue != null;
- }
-
- private static <UK, UV> Map.Entry<UK, UV> unwrapWithoutTs(Map.Entry<UK, TtlValue<UV>> e) {
- return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().getUserValue());
+ return () -> new EntriesIterator<>(withTs == null ? Collections.emptyList() : withTs, resultMapper);
}
@Override
public Iterable<UK> keys() throws Exception {
- return entriesStream().map(Map.Entry::getKey)::iterator;
+ return entries(Map.Entry::getKey);
}
@Override
public Iterable<UV> values() throws Exception {
- return entriesStream().map(Map.Entry::getValue)::iterator;
+ return entries(Map.Entry::getValue);
}
@Override
public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
- return entriesStream().iterator();
+ return entries().iterator();
}
@Override
public void clear() {
original.clear();
}
+
+ private class EntriesIterator<R> implements Iterator<R> {
+ private final Iterator<Map.Entry<UK, TtlValue<UV>>> originalIterator;
+ private final Function<Map.Entry<UK, UV>, R> resultMapper;
+ private Map.Entry<UK, UV> nextUnexpired = null;
+ private boolean rightAfterNextIsCalled = false;
+
+ private EntriesIterator(
+ @Nonnull Iterable<Map.Entry<UK, TtlValue<UV>>> withTs,
+ @Nonnull Function<Map.Entry<UK, UV>, R> resultMapper) {
+ this.originalIterator = withTs.iterator();
+ this.resultMapper = resultMapper;
+ }
+
+ @Override
+ public boolean hasNext() {
+ rightAfterNextIsCalled = false;
+ while (nextUnexpired == null && originalIterator.hasNext()) {
+ nextUnexpired = getUnexpiredAndUpdateOrCleanup(originalIterator.next());
+ }
+ return nextUnexpired != null;
+ }
+
+ @Override
+ public R next() {
+ if (hasNext()) {
+ rightAfterNextIsCalled = true;
+ R result = resultMapper.apply(nextUnexpired);
+ nextUnexpired = null;
+ return result;
+ }
+ throw new NoSuchElementException();
+ }
+
+ @Override
+ public void remove() {
+ if (rightAfterNextIsCalled) {
+ originalIterator.remove();
+ } else {
+ throw new IllegalStateException("next() has not been called or hasNext() has been called afterwards," +
+ " remove() is supported only right after calling next()");
+ }
+ }
+
+ private Map.Entry<UK, UV> getUnexpiredAndUpdateOrCleanup(Map.Entry<UK, TtlValue<UV>> e) {
+ UV unexpiredValue;
+ try {
+ unexpiredValue = getWithTtlCheckAndUpdate(
+ e::getValue,
+ v -> original.put(e.getKey(), v),
+ originalIterator::remove);
+ } catch (Exception ex) {
+ throw new FlinkRuntimeException(ex);
+ }
+ return unexpiredValue == null ? null : new AbstractMap.SimpleEntry<>(e.getKey(), unexpiredValue);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
index 01e4be9..c0aa465 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlReducingState.java
@@ -49,7 +49,7 @@ class TtlReducingState<K, N, T>
@Override
public void add(T value) throws Exception {
- original.add(wrapWithTs(value, Long.MAX_VALUE));
+ original.add(wrapWithTs(value));
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
index 82096a6..5909ac7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlStateFactory.java
@@ -49,16 +49,14 @@ public class TtlStateFactory {
TypeSerializer<N> namespaceSerializer,
StateDescriptor<S, SV> stateDesc,
KeyedStateFactory originalStateFactory,
- StateTtlConfiguration ttlConfig,
TtlTimeProvider timeProvider) throws Exception {
Preconditions.checkNotNull(namespaceSerializer);
Preconditions.checkNotNull(stateDesc);
Preconditions.checkNotNull(originalStateFactory);
- Preconditions.checkNotNull(ttlConfig);
Preconditions.checkNotNull(timeProvider);
- return ttlConfig.getTtlUpdateType() == StateTtlConfiguration.TtlUpdateType.Disabled ?
- originalStateFactory.createState(namespaceSerializer, stateDesc) :
- new TtlStateFactory(originalStateFactory, ttlConfig, timeProvider)
+ return stateDesc.getTtlConfig() == null ?
+ originalStateFactory.createInternalState(namespaceSerializer, stateDesc) :
+ new TtlStateFactory(originalStateFactory, stateDesc.getTtlConfig(), timeProvider)
.createState(namespaceSerializer, stateDesc);
}
@@ -96,7 +94,7 @@ public class TtlStateFactory {
stateDesc.getClass(), TtlStateFactory.class);
throw new FlinkRuntimeException(message);
}
- return stateFactory.createState(namespaceSerializer, stateDesc);
+ return stateFactory.createInternalState(namespaceSerializer, stateDesc);
}
@SuppressWarnings("unchecked")
@@ -106,7 +104,7 @@ public class TtlStateFactory {
ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlValueState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, stateDesc.getSerializer());
}
@@ -118,7 +116,7 @@ public class TtlStateFactory {
ListStateDescriptor<TtlValue<T>> ttlDescriptor = new ListStateDescriptor<>(
stateDesc.getName(), new TtlSerializer<>(listStateDesc.getElementSerializer()));
return (IS) new TtlListState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, listStateDesc.getSerializer());
}
@@ -132,7 +130,7 @@ public class TtlStateFactory {
mapStateDesc.getKeySerializer(),
new TtlSerializer<>(mapStateDesc.getValueSerializer()));
return (IS) new TtlMapState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, mapStateDesc.getSerializer());
}
@@ -146,7 +144,7 @@ public class TtlStateFactory {
new TtlReduceFunction<>(reducingStateDesc.getReduceFunction(), ttlConfig, timeProvider),
new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlReducingState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, stateDesc.getSerializer());
}
@@ -161,7 +159,7 @@ public class TtlStateFactory {
AggregatingStateDescriptor<IN, TtlValue<SV>, OUT> ttlDescriptor = new AggregatingStateDescriptor<>(
stateDesc.getName(), ttlAggregateFunction, new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlAggregatingState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, stateDesc.getSerializer(), ttlAggregateFunction);
}
@@ -178,7 +176,7 @@ public class TtlStateFactory {
new TtlFoldFunction<>(foldingStateDescriptor.getFoldFunction(), ttlConfig, timeProvider, initAcc),
new TtlSerializer<>(stateDesc.getSerializer()));
return (IS) new TtlFoldingState<>(
- originalStateFactory.createState(namespaceSerializer, ttlDescriptor),
+ originalStateFactory.createInternalState(namespaceSerializer, ttlDescriptor),
ttlConfig, timeProvider, stateDesc.getSerializer());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
index bac9d36..84809cc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlTimeProvider.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.state.ttl;
/**
* Provides time to TTL logic to judge about state expiration.
*/
-interface TtlTimeProvider {
+public interface TtlTimeProvider {
+ TtlTimeProvider DEFAULT = System::currentTimeMillis;
+
long currentTimestamp();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 9456f10..ffebc52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
@@ -168,7 +169,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger {
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
- TaskKvStateRegistry kvStateRegistry) throws Exception {
+ TaskKvStateRegistry kvStateRegistry,
+ TtlTimeProvider ttlTimeProvider) throws Exception {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
index 1325431..6424a7a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/HeapKeyedStateBackendAsyncByDefaultTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.junit.Rule;
@@ -70,7 +71,8 @@ public class HeapKeyedStateBackendAsyncByDefaultTest {
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
- null
+ null,
+ TtlTimeProvider.DEFAULT
);
assertTrue(keyedStateBackend.supportsAsynchronousSnapshots());
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 3c5756b..bfdc05d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -73,6 +73,7 @@ import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.testutils.ArtificialCNFExceptionThrowingClassLoader;
import org.apache.flink.types.IntValue;
@@ -182,7 +183,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
keySerializer,
numberOfKeyGroups,
keyGroupRange,
- env.getTaskKvStateRegistry());
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT);
backend.restore(null);
@@ -219,7 +221,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
keySerializer,
numberOfKeyGroups,
keyGroupRange,
- env.getTaskKvStateRegistry());
+ env.getTaskKvStateRegistry(),
+ TtlTimeProvider.DEFAULT);
backend.restore(new StateObjectCollection<>(state));
@@ -3545,7 +3548,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
}
// insert some data to the backend.
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3602,7 +3605,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = createKeyedBackend(IntSerializer.INSTANCE);
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3649,7 +3652,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
try {
backend = restoreKeyedBackend(IntSerializer.INSTANCE, stateHandle);
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
@@ -3791,7 +3794,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
return;
}
- InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createState(
+ InternalValueState<Integer, VoidNamespace, Integer> valueState = backend.createInternalState(
VoidNamespaceSerializer.INSTANCE,
new ValueStateDescriptor<>("test", IntSerializer.INSTANCE));
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index dfcdffc..558f629 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TestLogger;
import org.apache.commons.io.IOUtils;
@@ -54,7 +55,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class));
+ mock(PriorityQueueSetFactory.class),
+ TtlTimeProvider.DEFAULT);
try {
Assert.assertTrue(
@@ -77,7 +79,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class));
+ mock(PriorityQueueSetFactory.class),
+ TtlTimeProvider.DEFAULT);
try {
Assert.assertTrue(
@@ -118,12 +121,13 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class));
+ mock(PriorityQueueSetFactory.class),
+ TtlTimeProvider.DEFAULT);
try {
InternalValueState<String, VoidNamespace, String> state =
- stateBackend.createState(new VoidNamespaceSerializer(), stateDescriptor);
+ stateBackend.createInternalState(new VoidNamespaceSerializer(), stateDescriptor);
stateBackend.setCurrentKey("A");
state.setCurrentNamespace(VoidNamespace.INSTANCE);
@@ -160,12 +164,13 @@ public class StateSnapshotCompressionTest extends TestLogger {
true,
executionConfig,
TestLocalRecoveryConfig.disabled(),
- mock(PriorityQueueSetFactory.class));
+ mock(PriorityQueueSetFactory.class),
+ TtlTimeProvider.DEFAULT);
try {
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
- InternalValueState<String, VoidNamespace, String> state = stateBackend.createState(
+ InternalValueState<String, VoidNamespace, String> state = stateBackend.createInternalState(
new VoidNamespaceSerializer(),
stateDescriptor);
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
index 249d0c3..7b8d69f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackendSnapshotMigrationTest.java
@@ -73,7 +73,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
keyedBackend.restore(StateObjectCollection.singleton(stateHandles.getJobManagerOwnedSnapshot()));
- InternalMapState<String, Integer, Long, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+ InternalMapState<String, Integer, Long, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
keyedBackend.setCurrentKey("abc");
state.setCurrentNamespace(namespace1);
@@ -233,7 +233,7 @@ public class HeapKeyedStateBackendSnapshotMigrationTest extends HeapStateBackend
final ListStateDescriptor<Long> stateDescr = new ListStateDescriptor<>("my-state", Long.class);
stateDescr.initializeSerializerUnlessSet(new ExecutionConfig());
- InternalListState<String, Integer, Long> state = keyedBackend.createState(IntSerializer.INSTANCE, stateDescr);
+ InternalListState<String, Integer, Long> state = keyedBackend.createInternalState(IntSerializer.INSTANCE, stateDescr);
assertEquals(7, keyedBackend.numStateEntries());
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
index cf6aef4..0eddf3c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/heap/HeapStateBackendTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
+import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -61,6 +62,7 @@ public abstract class HeapStateBackendTestBase {
async,
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
- new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128));
+ new HeapPriorityQueueSetFactory(keyGroupRange, numKeyGroups, 128),
+ TtlTimeProvider.DEFAULT);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
new file mode 100644
index 0000000..06de4be
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/HeapTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+
+/** Test suite for heap state TTL. */
+public class HeapTtlStateTest extends TtlStateTestBase {
+ @Override
+ protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+ return new StateBackendTestContext(timeProvider) {
+ @Override
+ protected StateBackend createStateBackend() {
+ return new MemoryStateBackend(false);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
deleted file mode 100644
index e14c3f8..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTimeProvider.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-class MockTimeProvider implements TtlTimeProvider {
- long time = 0;
-
- @Override
- public long currentTimestamp() {
- return time;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
new file mode 100644
index 0000000..392bdba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlStateTest.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.ttl.mock.MockStateBackend;
+
+/** Test suite for mock state TTL. */
+public class MockTtlStateTest extends TtlStateTestBase {
+ @Override
+ protected StateBackendTestContext createStateBackendTestContext(TtlTimeProvider timeProvider) {
+ return new StateBackendTestContext(timeProvider) {
+ @Override
+ protected StateBackend createStateBackend() {
+ return new MockStateBackend();
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
new file mode 100644
index 0000000..f980043
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/MockTtlTimeProvider.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+class MockTtlTimeProvider implements TtlTimeProvider {
+ long time = 0;
+
+ @Override
+ public long currentTimestamp() {
+ return time;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
new file mode 100644
index 0000000..eaec234
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/StateBackendTestContext.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.internal.InternalKvState;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.RunnableFuture;
+
+/** Base class for state backend test context. */
+public abstract class StateBackendTestContext {
+ private final StateBackend stateBackend;
+ private final CheckpointStorageLocation checkpointStorageLocation;
+ private final TtlTimeProvider timeProvider;
+
+ private AbstractKeyedStateBackend<String> keyedStateBackend;
+
+ protected StateBackendTestContext(TtlTimeProvider timeProvider) {
+ this.timeProvider = Preconditions.checkNotNull(timeProvider);
+ this.stateBackend = Preconditions.checkNotNull(createStateBackend());
+ this.checkpointStorageLocation = createCheckpointStorageLocation();
+ }
+
+ protected abstract StateBackend createStateBackend();
+
+ private CheckpointStorageLocation createCheckpointStorageLocation() {
+ try {
+ return stateBackend
+ .createCheckpointStorage(new JobID())
+ .initializeLocationForCheckpoint(2L);
+ } catch (IOException e) {
+ throw new RuntimeException("unexpected");
+ }
+ }
+
+ void createAndRestoreKeyedStateBackend() {
+ Environment env = new DummyEnvironment();
+ try {
+ disposeKeyedStateBackend();
+ keyedStateBackend = stateBackend.createKeyedStateBackend(
+ env, new JobID(), "test", StringSerializer.INSTANCE, 10,
+ new KeyGroupRange(0, 9), env.getTaskKvStateRegistry(), timeProvider);
+ keyedStateBackend.setCurrentKey("defaultKey");
+ } catch (Exception e) {
+ throw new RuntimeException("unexpected");
+ }
+ }
+
+ void disposeKeyedStateBackend() {
+ if (keyedStateBackend != null) {
+ keyedStateBackend.dispose();
+ keyedStateBackend = null;
+ }
+ }
+
+ @Nonnull
+ KeyedStateHandle takeSnapshot() throws Exception {
+ RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture =
+ keyedStateBackend.snapshot(682375462392L, 10L,
+ checkpointStorageLocation, CheckpointOptions.forCheckpointWithDefaultLocation());
+ if (!snapshotRunnableFuture.isDone()) {
+ snapshotRunnableFuture.run();
+ }
+ return snapshotRunnableFuture.get().getJobManagerOwnedSnapshot();
+ }
+
+ void restoreSnapshot(@Nullable KeyedStateHandle snapshot) throws Exception {
+ Collection<KeyedStateHandle> restoreState =
+ snapshot == null ? null : new StateObjectCollection<>(Collections.singleton(snapshot));
+ keyedStateBackend.restore(restoreState);
+ if (snapshot != null) {
+ snapshot.discardState();
+ }
+ }
+
+ void setCurrentKey(String key) {
+ Preconditions.checkNotNull(keyedStateBackend, "keyed backend is not initialised");
+ keyedStateBackend.setCurrentKey(key);
+ }
+
+ @SuppressWarnings("unchecked")
+ <N, S extends State, V> S createState(
+ StateDescriptor<S, V> stateDescriptor,
+ @SuppressWarnings("SameParameterValue") N defaultNamespace) throws Exception {
+ S state = keyedStateBackend.getOrCreateKeyedState(StringSerializer.INSTANCE, stateDescriptor);
+ ((InternalKvState<?, N, ?>) state).setCurrentNamespace(defaultNamespace);
+ return state;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
deleted file mode 100644
index 5d9c682..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-/** Test suite for {@link TtlAggregatingState}. */
-public class TtlAggregatingStateTest
- extends TtlMergingStateBase.TtlIntegerMergingStateBase<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
- private static final long DEFAULT_ACCUMULATOR = 3L;
-
- @Override
- void initTestValues() {
- updater = v -> ttlState.add(v);
- getter = () -> ttlState.get();
- originalGetter = () -> ttlState.original.get();
-
- updateEmpty = 5;
- updateUnexpired = 7;
- updateExpired = 6;
-
- getUpdateEmpty = "8";
- getUnexpired = "15";
- getUpdateExpired = "9";
- }
-
- @Override
- TtlAggregatingState<?, String, Integer, Long, String> createState() {
- AggregatingStateDescriptor<Integer, Long, String> aggregatingStateDes =
- new AggregatingStateDescriptor<>("TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE);
- return (TtlAggregatingState<?, String, Integer, Long, String>) wrapMockState(aggregatingStateDes);
- }
-
- @Override
- String getMergeResult(
- List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
- List<Tuple2<String, Integer>> finalUpdatesToMerge) {
- Set<String> namespaces = new HashSet<>();
- unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
- finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
- return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
- namespaces.size() * (int) DEFAULT_ACCUMULATOR);
- }
-
- private static final AggregateFunction<Integer, Long, String> AGGREGATE =
- new AggregateFunction<Integer, Long, String>() {
- @Override
- public Long createAccumulator() {
- return DEFAULT_ACCUMULATOR;
- }
-
- @Override
- public Long add(Integer value, Long accumulator) {
- return accumulator + value;
- }
-
- @Override
- public String getResult(Long accumulator) {
- return accumulator.toString();
- }
-
- @Override
- public Long merge(Long a, Long b) {
- return a + b;
- }
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
new file mode 100644
index 0000000..b19391a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlAggregatingStateTestContext.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.AggregateFunction;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** Test suite for {@link TtlAggregatingState}. */
+class TtlAggregatingStateTestContext
+ extends TtlMergingStateTestContext.TtlIntegerMergingStateTestContext<TtlAggregatingState<?, String, Integer, Long, String>, Integer, String> {
+ private static final long DEFAULT_ACCUMULATOR = 3L;
+
+ @Override
+ void initTestValues() {
+ updateEmpty = 5;
+ updateUnexpired = 7;
+ updateExpired = 6;
+
+ getUpdateEmpty = "8";
+ getUnexpired = "15";
+ getUpdateExpired = "9";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new AggregatingStateDescriptor<>(
+ "TtlTestAggregatingState", AGGREGATE, LongSerializer.INSTANCE);
+ }
+
+ @Override
+ void update(Integer value) throws Exception {
+ ttlState.add(value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.get();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get();
+ }
+
+ @Override
+ String getMergeResult(
+ List<Tuple2<String, Integer>> unexpiredUpdatesToMerge,
+ List<Tuple2<String, Integer>> finalUpdatesToMerge) {
+ Set<String> namespaces = new HashSet<>();
+ unexpiredUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+ finalUpdatesToMerge.forEach(t -> namespaces.add(t.f0));
+ return Integer.toString(getIntegerMergeResult(unexpiredUpdatesToMerge, finalUpdatesToMerge) +
+ namespaces.size() * (int) DEFAULT_ACCUMULATOR);
+ }
+
+ private static final AggregateFunction<Integer, Long, String> AGGREGATE =
+ new AggregateFunction<Integer, Long, String>() {
+ @Override
+ public Long createAccumulator() {
+ return DEFAULT_ACCUMULATOR;
+ }
+
+ @Override
+ public Long add(Integer value, Long accumulator) {
+ return accumulator + value;
+ }
+
+ @Override
+ public String getResult(Long accumulator) {
+ return accumulator.toString();
+ }
+
+ @Override
+ public Long merge(Long a, Long b) {
+ return a + b;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
deleted file mode 100644
index 8dac8ca..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.ttl;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-
-/** Test suite for {@link TtlFoldingState}. */
-@SuppressWarnings("deprecation")
-public class TtlFoldingStateTest extends TtlStateTestBase<TtlFoldingState<?, String, Long, String>, Long, String> {
- @Override
- void initTestValues() {
- updater = v -> ttlState.add(v);
- getter = () -> ttlState.get();
- originalGetter = () -> ttlState.original.get();
-
- updateEmpty = 5L;
- updateUnexpired = 7L;
- updateExpired = 6L;
-
- getUpdateEmpty = "6";
- getUnexpired = "13";
- getUpdateExpired = "7";
- }
-
- @Override
- TtlFoldingState<?, String, Long, String> createState() {
- FoldingStateDescriptor<Long, String> foldingStateDesc =
- new FoldingStateDescriptor<>("TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE);
- return (TtlFoldingState<?, String, Long, String>) wrapMockState(foldingStateDesc);
- }
-
- private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
- long lacc = acc == null ? 0 : Long.parseLong(acc);
- return Long.toString(val == null ? lacc : lacc + val);
- };
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/f45b7f7f/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
new file mode 100644
index 0000000..2b072b9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ttl/TtlFoldingStateTestContext.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.ttl;
+
+import org.apache.flink.api.common.functions.FoldFunction;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.State;
+import org.apache.flink.api.common.state.StateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+
+/** Test suite for {@link TtlFoldingState}. */
+@SuppressWarnings("deprecation")
+class TtlFoldingStateTestContext extends TtlStateTestContextBase<TtlFoldingState<?, String, Long, String>, Long, String> {
+ @Override
+ void initTestValues() {
+ updateEmpty = 5L;
+ updateUnexpired = 7L;
+ updateExpired = 6L;
+
+ getUpdateEmpty = "6";
+ getUnexpired = "13";
+ getUpdateExpired = "7";
+ }
+
+ @Override
+ void update(Long value) throws Exception {
+ ttlState.add(value);
+ }
+
+ @Override
+ String get() throws Exception {
+ return ttlState.get();
+ }
+
+ @Override
+ Object getOriginal() throws Exception {
+ return ttlState.original.get();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ <US extends State, SV> StateDescriptor<US, SV> createStateDescriptor() {
+ return (StateDescriptor<US, SV>) new FoldingStateDescriptor<>(
+ "TtlTestFoldingState", "1", FOLD, StringSerializer.INSTANCE);
+ }
+
+ private static final FoldFunction<Long, String> FOLD = (acc, val) -> {
+ long lacc = acc == null ? 0 : Long.parseLong(acc);
+ return Long.toString(val == null ? lacc : lacc + val);
+ };
+}