You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/22 04:35:46 UTC

[flink] 02/03: [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93f1bcc3d3262d961709de3ea16b12dd6ea40c0b
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jul 10 19:33:29 2019 -0400

    [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service
    
    Certain operations require querying the registered timers for a key but
    timers are stored TIMESTAMP -> KEY[] which would only support O(n)
    reads. Because the timer service sits in the per-record code path we opt
    to add forEach*RegisteredTimer methods so consumers can copy data into a
    more appropriate data structure to avoid any accidental performance
    degredations.
---
 .../api/operators/InternalTimerService.java        | 13 ++++
 .../api/operators/InternalTimerServiceImpl.java    | 21 ++++++
 .../operators/InternalTimerServiceImplTest.java    | 79 ++++++++++++++++++++++
 .../api/operators/TestInternalTimerService.java    | 17 +++++
 4 files changed, 130 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
index cb171fb..f736053 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 /**
  * Interface for working with time and timers.
@@ -58,4 +59,16 @@ public interface InternalTimerService<N> {
 	 * Deletes the timer for the given key and namespace.
 	 */
 	void deleteEventTimeTimer(N namespace, long time);
+
+	/**
+	 * Performs an action for each registered timer. The timer service will
+	 * set the key context for the timers key before invoking the action.
+	 */
+	void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
+
+	/**
+	 * Performs an action for each registered timer. The timer service will
+	 * set the key context for the timers key before invoking the action.
+	 */
+	void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index dd88bc6..08f3a1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -226,6 +227,26 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>,
 	}
 
 	@Override
+	public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		foreachTimer(consumer, eventTimeTimersQueue);
+	}
+
+	@Override
+	public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		foreachTimer(consumer, processingTimeTimersQueue);
+	}
+
+	private void foreachTimer(BiConsumerWithException<N, Long, Exception> consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) throws Exception {
+		try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
+			while (iterator.hasNext()) {
+				final TimerHeapInternalTimer<K, N> timer = iterator.next();
+				keyContext.setCurrentKey(timer.getKey());
+				consumer.accept(timer.getNamespace(), timer.getTimestamp());
+			}
+		}
+	}
+
+	@Override
 	public void onProcessingTime(long time) throws Exception {
 		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
 		// inside the callback.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index 99d2700..777ef2b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -560,6 +561,84 @@ public class InternalTimerServiceImplTest {
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
 
+	/**
+	 * This also verifies that we iterate over all timers and set the key context on each element.
+	 */
+	@Test
+	public void testForEachEventTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		InternalTimerServiceImpl<Integer, String> timerService =
+			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+		timers.add(Tuple3.of(key1, "ciao", 10L));
+		timers.add(Tuple3.of(key1, "hello", 10L));
+		timers.add(Tuple3.of(key2, "ciao", 10L));
+		timers.add(Tuple3.of(key2, "hello", 10L));
+
+		for (Tuple3<Integer, String, Long> timer : timers) {
+			keyContext.setCurrentKey(timer.f0);
+			timerService.registerEventTimeTimer(timer.f1, timer.f2);
+		}
+
+		Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+		timerService.forEachEventTimeTimer((namespace, timer) -> {
+			results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+		});
+
+		Assert.assertEquals(timers, results);
+	}
+
+	/**
+	 * This also verifies that we iterate over all timers and set the key context on each element.
+	 */
+	@Test
+	public void testForEachProcessingTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		InternalTimerServiceImpl<Integer, String> timerService =
+			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+		timers.add(Tuple3.of(key1, "ciao", 10L));
+		timers.add(Tuple3.of(key1, "hello", 10L));
+		timers.add(Tuple3.of(key2, "ciao", 10L));
+		timers.add(Tuple3.of(key2, "hello", 10L));
+
+		for (Tuple3<Integer, String, Long> timer : timers) {
+			keyContext.setCurrentKey(timer.f0);
+			timerService.registerProcessingTimeTimer(timer.f1, timer.f2);
+		}
+
+		Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+		timerService.forEachProcessingTimeTimer((namespace, timer) -> {
+			results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+		});
+
+		Assert.assertEquals(timers, results);
+	}
+
 	@Test
 	public void testSnapshotAndRestore() throws Exception {
 		testSnapshotAndRestore(InternalTimerServiceSerializationProxy.VERSION);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index f8b095c..777dcbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -108,6 +109,22 @@ public class TestInternalTimerService<K, N> implements InternalTimerService<N> {
 		}
 	}
 
+	@Override
+	public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		for (Timer<K, N> timer : watermarkTimers) {
+			keyContext.setCurrentKey(timer.getKey());
+			consumer.accept(timer.getNamespace(), timer.getTimestamp());
+		}
+	}
+
+	@Override
+	public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		for (Timer<K, N> timer : processingTimeTimers) {
+			keyContext.setCurrentKey(timer.getKey());
+			consumer.accept(timer.getNamespace(), timer.getTimestamp());
+		}
+	}
+
 	public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception {
 		List<Timer<K, N>> result = new ArrayList<>();