You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/14 18:38:06 UTC

[flink] branch release-1.6 updated: [FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 51bd389  [FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance
51bd389 is described below

commit 51bd38985411d684ae1a8dbbf56c801a9da2ae96
Author: Andrey Zagrebin <az...@gmail.com>
AuthorDate: Wed Sep 12 14:29:26 2018 +0200

    [FLINK-10325] [State TTL] Refactor TtlListState to use only loops, no java stream API for performance
    
    This closes #6683.
---
 .../flink/runtime/state/ttl/TtlListState.java      | 34 +++++++++++++++-------
 .../flink/runtime/state/ttl/TtlMapState.java       |  6 ++--
 .../apache/flink/runtime/state/ttl/TtlUtils.java   | 16 ++++++++--
 .../apache/flink/runtime/state/ttl/TtlValue.java   |  2 ++
 4 files changed, 43 insertions(+), 15 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
index cb64df7..b384d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlListState.java
@@ -23,13 +23,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.internal.InternalListState;
 import org.apache.flink.util.Preconditions;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
-import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 /**
  * This class wraps list state with TTL logic.
@@ -73,11 +72,14 @@ class TtlListState<K, N, T> extends
 		return () -> new IteratorWithCleanup(finalResult.iterator());
 	}
 
-	private void updateTs(List<TtlValue<T>> ttlValue) throws Exception {
-		List<TtlValue<T>> unexpiredWithUpdatedTs = ttlValue.stream()
-			.filter(v -> !expired(v))
-			.map(this::rewrapWithNewTs)
-			.collect(Collectors.toList());
+	private void updateTs(List<TtlValue<T>> ttlValues) throws Exception {
+		List<TtlValue<T>> unexpiredWithUpdatedTs = new ArrayList<>(ttlValues.size());
+		long currentTimestamp = timeProvider.currentTimestamp();
+		for (TtlValue<T> ttlValue : ttlValues) {
+			if (!TtlUtils.expired(ttlValue, ttl, currentTimestamp)) {
+				unexpiredWithUpdatedTs.add(TtlUtils.wrapWithTs(ttlValue.getUserValue(), currentTimestamp));
+			}
+		}
 		if (!unexpiredWithUpdatedTs.isEmpty()) {
 			original.update(unexpiredWithUpdatedTs);
 		}
@@ -105,8 +107,15 @@ class TtlListState<K, N, T> extends
 	}
 
 	private <E> List<E> collect(Iterable<E> iterable) {
-		return iterable instanceof List ? (List<E>) iterable :
-			StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList());
+		if (iterable instanceof List) {
+			return (List<E>) iterable;
+		} else {
+			List<E> list = new ArrayList<>();
+			for (E element : iterable) {
+				list.add(element);
+			}
+			return list;
+		}
 	}
 
 	@Override
@@ -116,7 +125,12 @@ class TtlListState<K, N, T> extends
 	}
 
 	private List<TtlValue<T>> withTs(List<T> values) {
-		return values.stream().map(this::wrapWithTs).collect(Collectors.toList());
+		List<TtlValue<T>> withTs = new ArrayList<>(values.size());
+		for (T value : values) {
+			Preconditions.checkNotNull(value, "You cannot have null element in a ListState.");
+			withTs.add(wrapWithTs(value));
+		}
+		return withTs;
 	}
 
 	private class IteratorWithCleanup implements Iterator<T> {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
index f6f81ff..160dbeb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlMapState.java
@@ -68,8 +68,10 @@ class TtlMapState<K, N, UK, UV>
 			return;
 		}
 		Map<UK, TtlValue<UV>> ttlMap = new HashMap<>(map.size());
-		for (UK key : map.keySet()) {
-			ttlMap.put(key, wrapWithTs(map.get(key)));
+		long currentTimestamp = timeProvider.currentTimestamp();
+		for (Map.Entry<UK, UV> entry : map.entrySet()) {
+			UK key = entry.getKey();
+			ttlMap.put(key, TtlUtils.wrapWithTs(entry.getValue(), currentTimestamp));
 		}
 		original.putAll(ttlMap);
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
index 9d9e5e1..773fe7c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlUtils.java
@@ -18,14 +18,24 @@
 
 package org.apache.flink.runtime.state.ttl;
 
+import javax.annotation.Nullable;
+
 /** Common functions related to State TTL. */
 class TtlUtils {
-	static <V> boolean expired(TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
-		return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, timeProvider);
+	static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, TtlTimeProvider timeProvider) {
+		return expired(ttlValue, ttl, timeProvider.currentTimestamp());
+	}
+
+	static <V> boolean expired(@Nullable TtlValue<V> ttlValue, long ttl, long currentTimestamp) {
+		return ttlValue != null && expired(ttlValue.getLastAccessTimestamp(), ttl, currentTimestamp);
 	}
 
 	static boolean expired(long ts, long ttl, TtlTimeProvider timeProvider) {
-		return getExpirationTimestamp(ts, ttl) <= timeProvider.currentTimestamp();
+		return expired(ts, ttl, timeProvider.currentTimestamp());
+	}
+
+	private static boolean expired(long ts, long ttl, long currentTimestamp) {
+		return getExpirationTimestamp(ts, ttl) <= currentTimestamp;
 	}
 
 	private static long getExpirationTimestamp(long ts, long ttl) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
index 99f8b0b..a8bcadf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValue.java
@@ -28,6 +28,8 @@ import java.io.Serializable;
  * @param <T> Type of the user value of state with TTL
  */
 class TtlValue<T> implements Serializable {
+	private static final long serialVersionUID = 5221129704201125020L;
+
 	private final T userValue;
 	private final long lastAccessTimestamp;