You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 18:38:06 UTC
[flink] branch release-1.6 updated: [FLINK-10325] [State TTL]
Refactor TtlListState to use only loops, no java stream API for performance
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.6 by this push:
new 51bd389 [FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance
51bd389 is described below
commit 51bd38985411d684ae1a8dbbf56c801a9da2ae96
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 12 14:29:26 2018 +0200
[FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance
This closes #6683.
---
.../flink/runtime/state/ttl/TtlListState.java | 34 +++++++++++++++-------
.../flink/runtime/state/ttl/TtlMapState.java | 6 ++--
.../apache/flink/runtime/state/ttl/TtlUtils.java | 16 ++++++++--
.../apache/flink/runtime/state/ttl/TtlValue.java | 2 ++
4 files changed, 43 insertions(+), 15 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index cb64df7..b384d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -23,13 +23,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
/**
* This class wraps list state with TTL logic.
@@ -73,11 +72,14 @@ class TtlListState<K, N, T> extends
return () -> new IteratorWithCleanup(finalResult.iterator());
}
- private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
- List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
- .filter(v -> !expired(v))
- .map(this::rewrapWithNewTs)
- .collect(Collectors.toList());
+ private void updateTs(List<TtlValue<T>> ttlValues) throws Exception {
+ List<TtlValue<T>> unexpiredWithUpdatedTs = new ArrayList<>(ttlValues.size());
+ long currentTimestamp = timeProvider.currentTimestamp();
+ for (TtlValue<T> ttlValue : ttlValues) {
+ if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+ unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), currentTimestamp));
+ }
+ }
if (!unexpiredWithUpdatedTs.isEmpty()) {
original.update(unexpiredWithUpdatedTs);
}
@@ -105,8 +107,15 @@ class TtlListState<K, N, T> extends
}
private <E> List<E> collect(Iterable<E> iterable) {
- return iterable instanceof List ? (List<E>) iterable :
- StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
+ if (iterable instanceof List) {
+ return (List<E>) iterable;
+ } else {
+ List<E> list = new ArrayList<>();
+ for (E element : iterable) {
+ list.add(element);
+ }
+ return list;
+ }
}
@Override
@@ -116,7 +125,12 @@ class TtlListState<K, N, T> extends
}
private List<TtlValue<T>> withTs(List<T> values) {
- return values.stream().map(this::wrapWithTs).collect(Collectors.toList());
+ List<TtlValue<T>> withTs = new ArrayList<>(values.size());
+ for (T value : values) {
+ Preconditions.checkNotNull(value, "You cannot have null element in a ListState.");
+ withTs.add(wrapWithTs(value));
+ }
+ return withTs;
}
private class IteratorWithCleanup implements Iterator<T> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f6f81ff..160dbeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -68,8 +68,10 @@ class TtlMapState<K, N, UK, UV>
return;
}
Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
- for (UK key : map.keySet()) {
- ttlMap.put(key, wrapWithTs(map.get(key)));
+ long currentTimestamp = timeProvider.currentTimestamp();
+ for (Map.Entry<UK, UV> entry : map.entrySet()) {
+ UK key = entry.getKey();
+ ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp));
}
original.putAll(ttlMap);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 9d9e5e1..773fe7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -18,14 +18,24 @@
package org.apache.flink.runtime.state.ttl;
+import javax.annotation.Nullable;
+
/** Common functions related to State TTL. */
class TtlUtils {
- static <V> boolean expired(TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
- return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider);
+ static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
+ return expired(ttlValue, ttl, timeProvider.currentTimestamp());
+ }
+
+ static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
+ return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
}
static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
- return getExpirationTimestamp(ts, ttl) <= timeProvider.currentTimestamp();
+ return expired(ts, ttl, timeProvider.currentTimestamp());
+ }
+
+ private static boolean expired(long ts, long ttl, long currentTimestamp) {
+ return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
}
private static long getExpirationTimestamp(long ts, long ttl) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index 99f8b0b..a8bcadf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -28,6 +28,8 @@ import java.io.Serializable;
* @param <T> Type of the user value of state with TTL
*/
class TtlValue<T> implements Serializable {
+ private static final long serialVersionUID = 5221129704201125020L;
+
private final T userValue;
private final long lastAccessTimestamp;