You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/05/07 17:21:59 UTC

[GitHub] [flink] roeyshemtov commented on a change in pull request #11972: [FLINK-17058] Adding ProcessingTimeoutTrigger of nested triggers.

roeyshemtov commented on a change in pull request #11972:
URL: https://github.com/apache/flink/pull/11972#discussion_r421667955



##########
File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeoutTrigger.java
##########
@@ -0,0 +1,100 @@
+package org.apache.flink.streaming.api.windowing.triggers;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+/**
+ * A {@link Trigger} that can turn any {@link Trigger} into a timeout {@code Trigger}.
+ *
+ * <p>Each record arriving will emit a ProcessingTimeTimer withing the interval,
+ * you can control if the timer will be registered periodic for each event arriving,
+ * by the continualInterval flag.
+ * you can control if the state will be cleared after reach the timeout, by the
+ * shouldClearAtTimeout flag.
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
+
+@PublicEvolving
+public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {
+
+	private static final long serialVersionUID = 1L;
+
+	private Trigger<T, W> nestedTrigger;
+	private final long interval;
+	private final boolean continualInterval;
+	private final boolean shouldClearAtTimeout;
+
+	private final ValueStateDescriptor<Long> timeoutStateDesc;
+
+	private ProcessingTimeoutTrigger(Trigger<T, W> nestedTrigger, long interval, boolean continualInterval, boolean shouldClearAtTimeout) {
+		this.nestedTrigger = nestedTrigger;
+		this.interval = interval;
+		this.continualInterval = continualInterval;
+		this.shouldClearAtTimeout = shouldClearAtTimeout;
+		this.timeoutStateDesc = new ValueStateDescriptor<Long>("timeout", LongSerializer.INSTANCE);
+	}
+
+	@Override
+	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
+		TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
+		if (triggerResult.isFire()) {
+			this.clear(window, ctx);
+			return triggerResult;
+		}
+		
+		ValueState<Long> timeoutState = ctx.getPartitionedState(this.timeoutStateDesc);
+		long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
+
+		if (timeoutState.value() != null && continualInterval) {
+			ctx.deleteProcessingTimeTimer(timeoutState.value());
+			timeoutState.clear();
+		}
+
+		if (timeoutState.value() == null) {

Review comment:
       You absolutely right, i changed it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org