You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/22 04:39:46 UTC

[flink] branch release-1.9 updated (9dec1aa -> b8eec2b)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9dec1aa  [FLINK-13274]Refactor HiveTableSourceTest using HiveRunner
     new 752e7ae  [FLINK-13293] [state-processor-api] [build] Add state processor api to opt/ directory in flink-dist
     new 7d546b6  [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service
     new b8eec2b  [FLINK-13094][state-processor-api] Add registered*TimeTimers methods to KeyedStateReaderFunction#Context for querying the registered timers for a given key

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 flink-dist/pom.xml                                 |  7 ++
 flink-dist/src/main/assemblies/opt.xml             |  8 ++
 .../api/functions/KeyedStateReaderFunction.java    | 12 +++
 .../state/api/input/KeyedStateInputFormat.java     | 92 ++++++++++++++++++++-
 .../state/api/SavepointReaderKeyedStateITCase.java | 93 +++++++++++++++++-----
 .../state/api/input/KeyedStateInputFormatTest.java | 43 ++++++++++
 .../api/operators/InternalTimerService.java        | 13 +++
 .../api/operators/InternalTimerServiceImpl.java    | 21 +++++
 .../streaming/api/operators/TimerSerializer.java   |  4 +-
 .../operators/InternalTimerServiceImplTest.java    | 79 ++++++++++++++++++
 .../api/operators/TestInternalTimerService.java    | 17 ++++
 11 files changed, 367 insertions(+), 22 deletions(-)


[flink] 03/03: [FLINK-13094][state-processor-api] Add registered*TimeTimers methods to KeyedStateReaderFunction#Context for querying the registered timers for a given key

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b8eec2b822af8648c73422bcb86ae6b3822420a9
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jul 10 19:38:16 2019 -0400

    [FLINK-13094][state-processor-api] Add registered*TimeTimers methods to KeyedStateReaderFunction#Context for querying the registered timers for a given key
    
    This closes #9094.
---
 .../api/functions/KeyedStateReaderFunction.java    | 12 +++
 .../state/api/input/KeyedStateInputFormat.java     | 92 ++++++++++++++++++++-
 .../state/api/SavepointReaderKeyedStateITCase.java | 93 +++++++++++++++++-----
 .../state/api/input/KeyedStateInputFormatTest.java | 43 ++++++++++
 .../streaming/api/operators/TimerSerializer.java   |  4 +-
 5 files changed, 222 insertions(+), 22 deletions(-)

diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
index c5e947e..024c1a0 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/functions/KeyedStateReaderFunction.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Collector;
 
+import java.util.Set;
+
 /**
  * A function that processes keys from a restored operator
  *
@@ -79,6 +81,16 @@ public abstract class KeyedStateReaderFunction<K, OUT> extends AbstractRichFunct
 	 * afterwards!
 	 */
 	public interface Context {
+
+		/**
+		 * @return All registered event time timers for the current key.
+		 */
+		Set<Long> registeredEventTimeTimers() throws Exception;
+
+		/**
+		 * @return All registered processing time timers for the current key.
+		 */
+		Set<Long> registeredProcessingTimeTimers() throws Exception;
 	}
 }
 
diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
index c39369a..f0bf44f 100644
--- a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
+++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/input/KeyedStateInputFormat.java
@@ -23,9 +23,13 @@ import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.RichInputFormat;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.io.InputSplitAssigner;
@@ -37,24 +41,34 @@ import org.apache.flink.runtime.state.DefaultKeyedStateStore;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
 import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
 import org.apache.flink.state.api.runtime.NeverFireProcessingTimeService;
 import org.apache.flink.state.api.runtime.SavepointEnvironment;
 import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
+import org.apache.flink.state.api.runtime.VoidTriggerable;
+import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.KeyContext;
 import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
+import org.apache.flink.streaming.api.operators.TimerSerializer;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
 
 /**
  * Input format for reading partitioned state.
@@ -67,6 +81,8 @@ public class KeyedStateInputFormat<K, OUT> extends RichInputFormat<OUT, KeyGroup
 
 	private static final long serialVersionUID = 8230460226049597182L;
 
+	private static final String USER_TIMERS_NAME = "user-timers";
+
 	private final OperatorState operatorState;
 
 	private final StateBackend stateBackend;
@@ -167,7 +183,20 @@ public class KeyedStateInputFormat<K, OUT> extends RichInputFormat<OUT, KeyGroup
 		FunctionUtils.setFunctionRuntimeContext(userFunction, ctx);
 
 		keys = getKeyIterator(ctx);
-		this.ctx = new Context();
+
+		final InternalTimerService<VoidNamespace> timerService = restoreTimerService(context);
+		try {
+			this.ctx = new Context(keyedStateBackend, timerService);
+		} catch (Exception e) {
+			throw new IOException("Failed to restore timer state", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private InternalTimerService<VoidNamespace> restoreTimerService(StreamOperatorStateContext context) {
+		InternalTimeServiceManager<K> timeServiceManager = (InternalTimeServiceManager<K>) context.internalTimerServiceManager();
+		TimerSerializer<K, VoidNamespace> timerSerializer = new TimerSerializer<>(keySerializer, VoidNamespaceSerializer.INSTANCE);
+		return timeServiceManager.getInternalTimerService(USER_TIMERS_NAME, timerSerializer, VoidTriggerable.instance());
 	}
 
 	@SuppressWarnings("unchecked")
@@ -266,5 +295,64 @@ public class KeyedStateInputFormat<K, OUT> extends RichInputFormat<OUT, KeyGroup
 		return keyGroups;
 	}
 
-	private static class Context implements KeyedStateReaderFunction.Context {}
+	private static class Context<K> implements KeyedStateReaderFunction.Context {
+
+		private static final String EVENT_TIMER_STATE = "event-time-timers";
+
+		private static final String PROC_TIMER_STATE = "proc-time-timers";
+
+		ListState<Long> eventTimers;
+
+		ListState<Long> procTimers;
+
+		private Context(AbstractKeyedStateBackend<K> keyedStateBackend, InternalTimerService<VoidNamespace> timerService) throws Exception {
+			eventTimers = keyedStateBackend.getPartitionedState(
+				USER_TIMERS_NAME,
+				StringSerializer.INSTANCE,
+				new ListStateDescriptor<>(EVENT_TIMER_STATE, Types.LONG)
+			);
+
+			timerService.forEachEventTimeTimer((namespace, timer) -> {
+				if (namespace.equals(VoidNamespace.INSTANCE)) {
+					eventTimers.add(timer);
+				}
+			});
+
+			procTimers = keyedStateBackend.getPartitionedState(
+				USER_TIMERS_NAME,
+				StringSerializer.INSTANCE,
+				new ListStateDescriptor<>(PROC_TIMER_STATE, Types.LONG)
+			);
+
+			timerService.forEachProcessingTimeTimer((namespace, timer) -> {
+				if (namespace.equals(VoidNamespace.INSTANCE)) {
+					procTimers.add(timer);
+				}
+			});
+		}
+
+		@Override
+		public Set<Long> registeredEventTimeTimers() throws Exception {
+			Iterable<Long> timers = eventTimers.get();
+			if (timers == null) {
+				return Collections.emptySet();
+			}
+
+			return StreamSupport
+				.stream(timers.spliterator(), false)
+				.collect(Collectors.toSet());
+		}
+
+		@Override
+		public Set<Long> registeredProcessingTimeTimers() throws Exception {
+			Iterable<Long> timers = procTimers.get();
+			if (timers == null) {
+				return Collections.emptySet();
+			}
+
+			return StreamSupport
+				.stream(timers.spliterator(), false)
+				.collect(Collectors.toSet());
+		}
+	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
index 663c111..915f9d2 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointReaderKeyedStateITCase.java
@@ -45,8 +45,11 @@ import org.junit.Test;
 
 import java.time.Duration;
 import java.util.Arrays;
-import java.util.Comparator;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -77,7 +80,7 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 		streamEnv
 			.addSource(new SavepointSource())
 			.rebalance()
-			.keyBy(id -> id)
+			.keyBy(id -> id.key)
 			.process(new KeyedStatefulOperator())
 			.uid(uid)
 			.addSink(new DiscardingSink<>());
@@ -89,15 +92,13 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 		ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
 		ExistingSavepoint savepoint = Savepoint.load(batchEnv, path, backend);
 
-		List<Integer> results = savepoint
+		List<Pojo> results = savepoint
 			.readKeyedState(uid, new Reader())
 			.collect();
 
-		results.sort(Comparator.naturalOrder());
+		Set<Pojo> expected = SavepointSource.getElements();
 
-		List<Integer> expected = SavepointSource.getElements();
-
-		Assert.assertEquals("Unexpected results from keyed state", expected, results);
+		Assert.assertEquals("Unexpected results from keyed state", expected, new HashSet<>(results));
 	}
 
 	private String takeSavepoint(JobGraph jobGraph) throws Exception {
@@ -136,17 +137,20 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 		}
 	}
 
-	private static class SavepointSource implements SourceFunction<Integer> {
+	private static class SavepointSource implements SourceFunction<Pojo> {
 		private static volatile boolean finished;
 
 		private volatile boolean running = true;
 
-		private static final Integer[] elements = {1, 2, 3};
+		private static final Pojo[] elements = {
+			Pojo.of(1, 1),
+			Pojo.of(2, 2),
+			Pojo.of(3, 3)};
 
 		@Override
-		public void run(SourceContext<Integer> ctx) {
+		public void run(SourceContext<Pojo> ctx) {
 			synchronized (ctx.getCheckpointLock()) {
-				for (Integer element : elements) {
+				for (Pojo element : elements) {
 					ctx.collect(element);
 				}
 
@@ -175,12 +179,12 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 			return finished;
 		}
 
-		private static List<Integer> getElements() {
-			return Arrays.asList(elements);
+		private static Set<Pojo> getElements() {
+			return new HashSet<>(Arrays.asList(elements));
 		}
 	}
 
-	private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Integer, Void> {
+	private static class KeyedStatefulOperator extends KeyedProcessFunction<Integer, Pojo, Void> {
 
 		private transient ValueState<Integer> state;
 
@@ -190,12 +194,15 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 		}
 
 		@Override
-		public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception {
-			state.update(value);
+		public void processElement(Pojo value, Context ctx, Collector<Void> out) throws Exception {
+			state.update(value.state);
+
+			value.eventTimeTimer.forEach(timer -> ctx.timerService().registerEventTimeTimer(timer));
+			value.processingTimeTimer.forEach(timer -> ctx.timerService().registerProcessingTimeTimer(timer));
 		}
 	}
 
-	private static class Reader extends KeyedStateReaderFunction<Integer, Integer> {
+	private static class Reader extends KeyedStateReaderFunction<Integer, Pojo> {
 
 		private transient ValueState<Integer> state;
 
@@ -205,8 +212,56 @@ public class SavepointReaderKeyedStateITCase extends AbstractTestBase {
 		}
 
 		@Override
-		public void readKey(Integer key, Context ctx, Collector<Integer> out) throws Exception {
-			out.collect(state.value());
+		public void readKey(Integer key, Context ctx, Collector<Pojo> out) throws Exception {
+			Pojo pojo = new Pojo();
+			pojo.key = key;
+			pojo.state = state.value();
+			pojo.eventTimeTimer = ctx.registeredEventTimeTimers();
+			pojo.processingTimeTimer = ctx.registeredProcessingTimeTimers();
+
+			out.collect(pojo);
+		}
+	}
+
+	/**
+	 * A simple pojo type.
+	 */
+	public static class Pojo {
+		public static Pojo of(Integer key, Integer state) {
+			Pojo wrapper = new Pojo();
+			wrapper.key = key;
+			wrapper.state = state;
+			wrapper.eventTimeTimer = Collections.singleton(Long.MAX_VALUE - 1);
+			wrapper.processingTimeTimer = Collections.singleton(Long.MAX_VALUE - 2);
+
+			return wrapper;
+		}
+
+		Integer key;
+
+		Integer state;
+
+		Set<Long> eventTimeTimer;
+
+		Set<Long> processingTimeTimer;
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			} else if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Pojo pojo = (Pojo) o;
+			return Objects.equals(key, pojo.key) &&
+				Objects.equals(state, pojo.state) &&
+				Objects.equals(eventTimeTimer, pojo.eventTimeTimer) &&
+				Objects.equals(processingTimeTimer, pojo.processingTimeTimer);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(key, state, eventTimeTimer, processingTimeTimer);
 		}
 	}
 }
diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
index 2824b34..1327519 100644
--- a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
+++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/input/KeyedStateInputFormatTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.state.api.functions.KeyedStateReaderFunction;
 import org.apache.flink.state.api.input.splits.KeyGroupRangeInputSplit;
 import org.apache.flink.state.api.runtime.OperatorIDGenerator;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamFlatMap;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
@@ -48,6 +49,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Tests for keyed state input format.
@@ -135,6 +137,24 @@ public class KeyedStateInputFormatTest {
 		Assert.fail("KeyedStateReaderFunction did not fail on invalid RuntimeContext use");
 	}
 
+	@Test
+	public void testReadTime() throws Exception {
+		OperatorID operatorID = OperatorIDGenerator.fromUid("uid");
+
+		OperatorSubtaskState state = createOperatorSubtaskState(new KeyedProcessOperator<>(new StatefulFunctionWithTime()));
+		OperatorState operatorState = new OperatorState(operatorID, 1, 128);
+		operatorState.putState(0, state);
+
+		KeyedStateInputFormat<?, ?> format = new KeyedStateInputFormat<>(operatorState, new MemoryStateBackend(), Types.INT, new TimeReaderFunction());
+		KeyGroupRangeInputSplit split = format.createInputSplits(1)[0];
+
+		KeyedStateReaderFunction<Integer, Integer> userFunction = new TimeReaderFunction();
+
+		List<Integer> data = readInputSplit(split, userFunction);
+
+		Assert.assertEquals("Incorrect data read from input split", Arrays.asList(1, 1, 2, 2, 3, 3), data);
+	}
+
 	@Nonnull
 	private List<Integer> readInputSplit(KeyGroupRangeInputSplit split, KeyedStateReaderFunction<Integer, Integer> userFunction) throws IOException {
 		KeyedStateInputFormat<Integer, Integer> format = new KeyedStateInputFormat<>(
@@ -245,6 +265,29 @@ public class KeyedStateInputFormatTest {
 		public void processElement(Integer value, Context ctx, Collector<Void> out) throws Exception {
 			state.update(value);
 			ctx.timerService().registerEventTimeTimer(value);
+			ctx.timerService().registerProcessingTimeTimer(value);
+		}
+	}
+
+	static class TimeReaderFunction extends KeyedStateReaderFunction<Integer, Integer> {
+		ValueState<Integer> state;
+
+		@Override
+		public void open(Configuration parameters) {
+			state = getRuntimeContext().getState(stateDescriptor);
+		}
+
+		@Override
+		public void readKey(Integer key, KeyedStateReaderFunction.Context ctx, Collector<Integer> out) throws Exception {
+			Set<Long> eventTimers = ctx.registeredEventTimeTimers();
+			Assert.assertEquals("Each key should have exactly one event timer for key " + key, 1, eventTimers.size());
+
+			out.collect(eventTimers.iterator().next().intValue());
+
+			Set<Long> procTimers = ctx.registeredProcessingTimeTimers();
+			Assert.assertEquals("Each key should have exactly one processing timer for key " + key, 1, procTimers.size());
+
+			out.collect(procTimers.iterator().next().intValue());
 		}
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
index a9774e4..1abf8f2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerSerializer.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -39,6 +40,7 @@ import java.util.Objects;
  * @param <K> type of the timer key.
  * @param <N> type of the timer namespace.
  */
+@Internal
 public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer<K, N>> {
 
 	private static final long serialVersionUID = 1L;
@@ -60,7 +62,7 @@ public class TimerSerializer<K, N> extends TypeSerializer<TimerHeapInternalTimer
 	/** True iff the serialized type (and composite objects) are immutable. */
 	private final boolean immutableType;
 
-	TimerSerializer(
+	public TimerSerializer(
 		@Nonnull TypeSerializer<K> keySerializer,
 		@Nonnull TypeSerializer<N> namespaceSerializer) {
 		this(


[flink] 02/03: [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7d546b6321e08de7fb75f6104914c216b0522e95
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Wed Jul 10 19:33:29 2019 -0400

    [FLINK-13094][state-processor-api] Support iterating over registered timers in the internal timer service
    
    Certain operations require querying the registered timers for a key but
    timers are stored TIMESTAMP -> KEY[] which would only support O(n)
    reads. Because the timer service sits in the per-record code path we opt
    to add forEach*RegisteredTimer methods so consumers can copy data into a
    more appropriate data structure to avoid any accidental performance
    degredations.
---
 .../api/operators/InternalTimerService.java        | 13 ++++
 .../api/operators/InternalTimerServiceImpl.java    | 21 ++++++
 .../operators/InternalTimerServiceImplTest.java    | 79 ++++++++++++++++++++++
 .../api/operators/TestInternalTimerService.java    | 17 +++++
 4 files changed, 130 insertions(+)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
index cb171fb..f736053 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 /**
  * Interface for working with time and timers.
@@ -58,4 +59,16 @@ public interface InternalTimerService<N> {
 	 * Deletes the timer for the given key and namespace.
 	 */
 	void deleteEventTimeTimer(N namespace, long time);
+
+	/**
+	 * Performs an action for each registered timer. The timer service will
+	 * set the key context for the timers key before invoking the action.
+	 */
+	void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
+
+	/**
+	 * Performs an action for each registered timer. The timer service will
+	 * set the key context for the timers key before invoking the action.
+	 */
+	void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception;
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
index dd88bc6..08f3a1d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -226,6 +227,26 @@ public class InternalTimerServiceImpl<K, N> implements InternalTimerService<N>,
 	}
 
 	@Override
+	public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		foreachTimer(consumer, eventTimeTimersQueue);
+	}
+
+	@Override
+	public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		foreachTimer(consumer, processingTimeTimersQueue);
+	}
+
+	private void foreachTimer(BiConsumerWithException<N, Long, Exception> consumer, KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> queue) throws Exception {
+		try (final CloseableIterator<TimerHeapInternalTimer<K, N>> iterator = queue.iterator()) {
+			while (iterator.hasNext()) {
+				final TimerHeapInternalTimer<K, N> timer = iterator.next();
+				keyContext.setCurrentKey(timer.getKey());
+				consumer.accept(timer.getNamespace(), timer.getTimestamp());
+			}
+		}
+	}
+
+	@Override
 	public void onProcessingTime(long time) throws Exception {
 		// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
 		// inside the callback.
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
index 99d2700..777ef2b 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerServiceImplTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.operators;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -560,6 +561,84 @@ public class InternalTimerServiceImplTest {
 		assertEquals(0, timerService.numEventTimeTimers());
 	}
 
+	/**
+	 * This also verifies that we iterate over all timers and set the key context on each element.
+	 */
+	@Test
+	public void testForEachEventTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		InternalTimerServiceImpl<Integer, String> timerService =
+			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+		timers.add(Tuple3.of(key1, "ciao", 10L));
+		timers.add(Tuple3.of(key1, "hello", 10L));
+		timers.add(Tuple3.of(key2, "ciao", 10L));
+		timers.add(Tuple3.of(key2, "hello", 10L));
+
+		for (Tuple3<Integer, String, Long> timer : timers) {
+			keyContext.setCurrentKey(timer.f0);
+			timerService.registerEventTimeTimer(timer.f1, timer.f2);
+		}
+
+		Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+		timerService.forEachEventTimeTimer((namespace, timer) -> {
+			results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+		});
+
+		Assert.assertEquals(timers, results);
+	}
+
+	/**
+	 * This also verifies that we iterate over all timers and set the key context on each element.
+	 */
+	@Test
+	public void testForEachProcessingTimeTimers() throws Exception {
+		@SuppressWarnings("unchecked")
+		Triggerable<Integer, String> mockTriggerable = mock(Triggerable.class);
+
+		TestKeyContext keyContext = new TestKeyContext();
+		TestProcessingTimeService processingTimeService = new TestProcessingTimeService();
+		InternalTimerServiceImpl<Integer, String> timerService =
+			createAndStartInternalTimerService(mockTriggerable, keyContext, processingTimeService, testKeyGroupRange, createQueueFactory());
+
+		// get two different keys
+		int key1 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		int key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		while (key2 == key1) {
+			key2 = getKeyInKeyGroupRange(testKeyGroupRange, maxParallelism);
+		}
+
+		Set<Tuple3<Integer, String, Long>> timers = new HashSet<>();
+		timers.add(Tuple3.of(key1, "ciao", 10L));
+		timers.add(Tuple3.of(key1, "hello", 10L));
+		timers.add(Tuple3.of(key2, "ciao", 10L));
+		timers.add(Tuple3.of(key2, "hello", 10L));
+
+		for (Tuple3<Integer, String, Long> timer : timers) {
+			keyContext.setCurrentKey(timer.f0);
+			timerService.registerProcessingTimeTimer(timer.f1, timer.f2);
+		}
+
+		Set<Tuple3<Integer, String, Long>> results = new HashSet<>();
+		timerService.forEachProcessingTimeTimer((namespace, timer) -> {
+			results.add(Tuple3.of((Integer) keyContext.getCurrentKey(), namespace, timer));
+		});
+
+		Assert.assertEquals(timers, results);
+	}
+
 	@Test
 	public void testSnapshotAndRestore() throws Exception {
 		testSnapshotAndRestore(InternalTimerServiceSerializationProxy.VERSION);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
index f8b095c..777dcbb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TestInternalTimerService.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.BiConsumerWithException;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -108,6 +109,22 @@ public class TestInternalTimerService<K, N> implements InternalTimerService<N> {
 		}
 	}
 
+	@Override
+	public void forEachEventTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		for (Timer<K, N> timer : watermarkTimers) {
+			keyContext.setCurrentKey(timer.getKey());
+			consumer.accept(timer.getNamespace(), timer.getTimestamp());
+		}
+	}
+
+	@Override
+	public void forEachProcessingTimeTimer(BiConsumerWithException<N, Long, Exception> consumer) throws Exception {
+		for (Timer<K, N> timer : processingTimeTimers) {
+			keyContext.setCurrentKey(timer.getKey());
+			consumer.accept(timer.getNamespace(), timer.getTimestamp());
+		}
+	}
+
 	public Collection<Timer<K, N>> advanceProcessingTime(long time) throws Exception {
 		List<Timer<K, N>> result = new ArrayList<>();
 


[flink] 01/03: [FLINK-13293] [state-processor-api] [build] Add state processor api to opt/ directory in flink-dist

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 752e7ae6a43455234606a9a7da50f40182b2db51
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Tue Jul 16 09:43:27 2019 -0700

    [FLINK-13293] [state-processor-api] [build] Add state processor api to opt/ directory in flink-dist
    
    This closes #9133.
---
 flink-dist/pom.xml                     | 7 +++++++
 flink-dist/src/main/assemblies/opt.xml | 8 ++++++++
 2 files changed, 15 insertions(+)

diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index f6c491b..8af6bd4 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -308,6 +308,13 @@ under the License.
 
 		<dependency>
 			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-state-processor-api_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>provided</scope>
+		</dependency>
+
+		<dependency>
+			<groupId>org.apache.flink</groupId>
 			<artifactId>flink-azure-fs-hadoop</artifactId>
 			<version>${project.version}</version>
 			<scope>provided</scope>
diff --git a/flink-dist/src/main/assemblies/opt.xml b/flink-dist/src/main/assemblies/opt.xml
index 1c2444e..b2f989a 100644
--- a/flink-dist/src/main/assemblies/opt.xml
+++ b/flink-dist/src/main/assemblies/opt.xml
@@ -75,6 +75,14 @@
 			<fileMode>0644</fileMode>
 		</file>
 
+		<!-- State Processor API -->
+		<file>
+			<source>../flink-libraries/flink-state-processing-api/target/flink-state-processor-api_${scala.binary.version}-${project.version}.jar</source>
+			<outputDirectory>opt/</outputDirectory>
+			<destName>flink-state-processor-api_${scala.binary.version}-${project.version}.jar</destName>
+			<fileMode>0644</fileMode>
+		</file>
+
 		<!-- Metrics -->
 		<file>
 			<source>../flink-metrics/flink-metrics-graphite/target/flink-metrics-graphite-${project.version}.jar</source>