You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/22 04:35:46 UTC
[flink] 02/03: [FLINK-13094][state-processor-api] Support iterating
over registered timers in the internal timer service
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 93f1bcc3d3262d961709de3ea16b12dd6ea40c0b
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jul 10 19:33:29 2019 -0400
[FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service
Certain operations require querying the registered timers for a key but
timers are stored TIMESTAMP -> KEY[] which would only support O(n)
reads. Because the timer service sits in the per-record code path we opt
to add forEach*RegisteredTimer methods so consumers can copy data into a
more appropriate data structure to avoid any accidental performance
degredations.
---
.../api/operators/InternalTimerService.java | 13 ++++
.../api/operators/InternalTimerServiceImpl.java | 21 ++++++
.../operators/InternalTimerServiceImplTest.java | 79 ++++++++++++++++++++++
.../api/operators/TestInternalTimerService.java | 17 +++++
4 files changed, 130 insertions(+)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
index cb171fb..f736053 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
/**
* Interface for working with time and timers.
@@ -58,4 +59,16 @@ public interface InternalTimerService<N> {
* Deletes the timer for the given key and namespace.
*/
void deleteEventTimeTimer(N namespace, long time);
+
+ /**
+ * Performs an action for each registered timer. The timer service will
+ * set the key context for the timers key before invoking the action.
+ */
+ void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
+
+ /**
+ * Performs an action for each registered timer. The timer service will
+ * set the key context for the timers key before invoking the action.
+ */
+ void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index dd88bc6..08f3a1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
import java.util.ArrayList;
import java.util.Collections;
@@ -226,6 +227,26 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>,
}
@Override
+ public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+ foreachTimer(consumer, eventTimeTimersQueue);
+ }
+
+ @Override
+ public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+ foreachTimer(consumer, processingTimeTimersQueue);
+ }
+
+ private void foreachTimer(BiConsumerWithException<N, Long, Exception> consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) throws Exception {
+ try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
+ while (iterator.hasNext()) {
+ final TimerHeapInternalTimer<K, N> timer = iterator.next();
+ keyContext.setCurrentKey(timer.getKey());
+ consumer.accept(timer.getNamespace(), timer.getTimestamp());
+ }
+ }
+ }
+
+ @Override
public void onProcessingTime(long time) throws Exception {
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index 99d2700..777ef2b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -560,6 +561,84 @@ public class InternalTimerServiceImplTest {
assertEquals(0, timerService.numEventTimeTimers());
}
+ /**
+ * This also verifies that we iterate over all timers and set the key context on each element.
+ */
+ @Test
+ public void testForEachEventTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ InternalTimerServiceImpl<Integer, String> timerService =
+ createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+ // get two different keys
+ int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ while (key2 == key1) {
+ key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ }
+
+ Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+ timers.add(Tuple3.of(key1, "ciao", 10L));
+ timers.add(Tuple3.of(key1, "hello", 10L));
+ timers.add(Tuple3.of(key2, "ciao", 10L));
+ timers.add(Tuple3.of(key2, "hello", 10L));
+
+ for (Tuple3<Integer, String, Long> timer : timers) {
+ keyContext.setCurrentKey(timer.f0);
+ timerService.registerEventTimeTimer(timer.f1, timer.f2);
+ }
+
+ Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+ timerService.forEachEventTimeTimer((namespace, timer) -> {
+ results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+ });
+
+ Assert.assertEquals(timers, results);
+ }
+
+ /**
+ * This also verifies that we iterate over all timers and set the key context on each element.
+ */
+ @Test
+ public void testForEachProcessingTimeTimers() throws Exception {
+ @SuppressWarnings("unchecked")
+ Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+ TestKeyContext keyContext = new TestKeyContext();
+ TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+ InternalTimerServiceImpl<Integer, String> timerService =
+ createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+ // get two different keys
+ int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ while (key2 == key1) {
+ key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+ }
+
+ Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+ timers.add(Tuple3.of(key1, "ciao", 10L));
+ timers.add(Tuple3.of(key1, "hello", 10L));
+ timers.add(Tuple3.of(key2, "ciao", 10L));
+ timers.add(Tuple3.of(key2, "hello", 10L));
+
+ for (Tuple3<Integer, String, Long> timer : timers) {
+ keyContext.setCurrentKey(timer.f0);
+ timerService.registerProcessingTimeTimer(timer.f1, timer.f2);
+ }
+
+ Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+ timerService.forEachProcessingTimeTimer((namespace, timer) -> {
+ results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+ });
+
+ Assert.assertEquals(timers, results);
+ }
+
@Test
public void testSnapshotAndRestore() throws Exception {
testSnapshotAndRestore(InternalTimerServiceSerializationProxy.VERSION);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index f8b095c..777dcbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
import java.util.ArrayList;
import java.util.Collection;
@@ -108,6 +109,22 @@ public class TestInternalTimerService<K, N> implements InternalTimerService<N> {
}
}
+ @Override
+ public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+ for (Timer<K, N> timer : watermarkTimers) {
+ keyContext.setCurrentKey(timer.getKey());
+ consumer.accept(timer.getNamespace(), timer.getTimestamp());
+ }
+ }
+
+ @Override
+ public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+ for (Timer<K, N> timer : processingTimeTimers) {
+ keyContext.setCurrentKey(timer.getKey());
+ consumer.accept(timer.getNamespace(), timer.getTimestamp());
+ }
+ }
+
public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception {
List<Timer<K, N>> result = new ArrayList<>();