You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:47 UTC
[13/13] flink git commit: [FLINK-2807] Add Javadocs for new windowing
semantics/internals
[FLINK-2807] Add Javadocs for new windowing semantics/internals
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62df0a03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62df0a03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62df0a03
Branch: refs/heads/master
Commit: 62df0a0349b276d4a5b7d9954d2a07f367a61d2d
Parents: 8c2c769
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Oct 3 16:47:28 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
.../api/windowing/assigners/GlobalWindows.java | 8 +++
.../assigners/SlidingProcessingTimeWindows.java | 13 ++++
.../windowing/assigners/SlidingTimeWindows.java | 13 ++++
.../TumblingProcessingTimeWindows.java | 13 ++++
.../assigners/TumblingTimeWindows.java | 13 ++++
.../api/windowing/assigners/WindowAssigner.java | 22 ++++++
.../api/windowing/evictors/CountEvictor.java | 10 +++
.../api/windowing/evictors/DeltaEvictor.java | 16 +++++
.../api/windowing/evictors/Evictor.java | 25 ++++++-
.../api/windowing/evictors/TimeEvictor.java | 11 +++
.../ContinuousProcessingTimeTrigger.java | 12 ++++
.../triggers/ContinuousWatermarkTrigger.java | 14 ++++
.../api/windowing/triggers/CountTrigger.java | 11 +++
.../api/windowing/triggers/DeltaTrigger.java | 20 ++++++
.../triggers/ProcessingTimeTrigger.java | 7 ++
.../api/windowing/triggers/PurgingTrigger.java | 15 +++++
.../api/windowing/triggers/Trigger.java | 70 ++++++++++++++++++--
.../windowing/triggers/WatermarkTrigger.java | 9 +++
.../operators/BucketStreamSortOperator.java | 16 ++++-
.../runtime/operators/Triggerable.java | 7 +-
.../EvictingNonKeyedWindowOperator.java | 10 ++-
.../windowing/EvictingWindowOperator.java | 12 ++++
.../windowing/NonKeyedWindowOperator.java | 9 +++
.../operators/windowing/WindowOperator.java | 60 +++++++++++++++--
.../windowing/buffers/EvictingWindowBuffer.java | 15 ++++-
.../windowing/buffers/HeapWindowBuffer.java | 8 ++-
.../buffers/PreAggregatingHeapWindowBuffer.java | 9 ++-
.../windowing/buffers/WindowBuffer.java | 38 +++++++++--
.../windowing/buffers/WindowBufferFactory.java | 24 +++++++
29 files changed, 483 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 391a6a4..52c8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -23,6 +23,14 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import java.util.Collection;
import java.util.Collections;
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global window.
+ *
+ * <p>
+ * Use this if you want to use a {@link Trigger} and
+ * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to to flexible, policy based
+ * windows.
+ */
public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 6fc79b0..65d7641 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -28,6 +28,19 @@ import java.io.ObjectInputStream;
import java.util.Collection;
import java.util.List;
+/**
+ * A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing
+ * is based on system time. Windows can possibly overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ * keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 49bff05..52ae356 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -26,6 +26,19 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+/**
+ * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
+ * elements. Windows can possibly overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ * keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 1f2eebf..41f6362 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -25,6 +25,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
+/**
+ * A {@link WindowAssigner} that windows elements into time-based windows. The windowing is
+ * based on system time. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ * keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 019f45b..b6022b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -25,6 +25,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import java.util.Collection;
import java.util.Collections;
+/**
+ * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ * keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 5996426..105caa6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -23,10 +23,32 @@ import scala.Serializable;
import java.util.Collection;
+/**
+ * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
+ *
+ * <p>
+ * In a window operation, elements are grouped by their key (if available) and by the windows to
+ * which it was assigned. The set of elements with the same key and window is called a pane.
+ * When a {@link Trigger} decides that a certain pane should fire the
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
+ * to produce output elements for that pane.
+ *
+ * @param <T> The type of elements that this WindowAssigner can assign windows to.
+ * @param <W> The type of {@code Window} that this assigner assigns.
+ */
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
+ /**
+ * Returns a {@code Collection} of windows that should be assigned to the element.
+ *
+ * @param element The element to which windows should be assigned.
+ * @param timestamp The timestamp of the element.
+ */
public abstract Collection<W> assignWindows(T element, long timestamp);
+ /**
+ * Returns the default trigger associated with this {@code WindowAssigner}.
+ */
public abstract Trigger<T, W> getDefaultTrigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index 04636ee..0a078e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -20,6 +20,11 @@ package org.apache.flink.streaming.api.windowing.evictors;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+/**
+ * An {@link Evictor} that keeps only a certain amount of elements.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
public class CountEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;
@@ -38,6 +43,11 @@ public class CountEvictor<W extends Window> implements Evictor<Object, W> {
}
}
+ /**
+ * Creates a {@code CountEvictor} that keeps the given number of elements.
+ *
+ * @param maxCount The number of elements to keep in the pane.
+ */
public static <W extends Window> CountEvictor<W> of(long maxCount) {
return new CountEvictor<>(maxCount);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
index c7872ce..0083a04 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -22,6 +22,16 @@ import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+/**
+ * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
+ *
+ * <p>
+ * Eviction starts from the first element of the buffer and removes all elements from the buffer
+ * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
+ * the eviction stops.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
private static final long serialVersionUID = 1L;
@@ -52,6 +62,12 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
return "DeltaEvictor(" + deltaFunction + ", " + threshold + ")";
}
+ /**
+ * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
+ *
+ * @param threshold The threshold
+ * @param deltaFunction The {@code DeltaFunction}
+ */
public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
return new DeltaEvictor<>(threshold, deltaFunction);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index db04ac4..1a6c5c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -21,8 +21,31 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import scala.Serializable;
+/**
+ * An {@code Evictor} can remove elements from a pane before it is being processed and after
+ * window evaluation was triggered by a
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Evictor}.
+ *
+ * @param <T> The type of elements that this {@code Evictor} can evict.
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
public interface Evictor<T, W extends Window> extends Serializable {
- public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+ /**
+ * Computes how many elements should be removed from the pane. The result specifies how
+ * many elements should be removed from the beginning.
+ *
+ * @param elements The elements currently in the pane.
+ * @param size The current number of elements in the pane.
+ * @param window The {@link Window}
+ */
+ int evict(Iterable<StreamRecord<T>> elements, int size, W window);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 2965214..5004c42 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -22,6 +22,12 @@ import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+/**
+ * An {@link Evictor} that keeps elements for a certain amount of time. Elements older
+ * than {@code current_time - keep_time} are evicted.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
private static final long serialVersionUID = 1L;
@@ -55,6 +61,11 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
return windowSize;
}
+ /**
+ * Creates a {@code TimeEvictor} that keeps the given number of elements.
+ *
+ * @param windowSize The amount of time for which to keep elements.
+ */
public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
return new TimeEvictor<>(windowSize.toMilliseconds());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 24e8ce3..f23f6ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -21,6 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
+ * system time.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;
@@ -80,6 +86,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
return "ContinuousProcessingTimeTrigger(" + interval + ")";
}
+ /**
+ * Creates a trigger that continuously fires based on the given interval.
+ *
+ * @param interval The time interval at which to fire.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index e11ceba..02ea81d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -21,6 +21,14 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. This fires based
+ * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;
@@ -66,6 +74,12 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
return interval;
}
+ /**
+ * Creates a trigger that continuously fires based on the given interval.
+ *
+ * @param interval The time interval at which to fire.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index a51fae6..53480fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -19,6 +19,11 @@ package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.streaming.api.windowing.windows.Window;
+/**
+ * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public class CountTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;
@@ -55,6 +60,12 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
return "CountTrigger(" + maxCount + ")";
}
+ /**
+ * Creates a trigger that fires once the number of elements in a pane reaches the given count.
+ *
+ * @param maxCount The count of elements at which to fire.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public static <W extends Window> CountTrigger<W> of(long maxCount) {
return new CountTrigger<>(maxCount);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index ecd7ed0..cf4cf0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -20,6 +20,16 @@ package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
+/**
+ * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
+ *
+ * <p>
+ * This trigger calculates a delta between the data point which triggered last
+ * and the currently arrived data point. It triggers if the delta is higher than
+ * a specified threshold.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
private static final long serialVersionUID = 1L;
@@ -60,6 +70,16 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
return "DeltaTrigger(" + deltaFunction + ", " + threshold + ")";
}
+ /**
+ * Creates a delta trigger from the given threshold and {@code DeltaFunction}.
+ *
+ * @param threshold The threshold at which to trigger.
+ * @param deltaFunction The delta function to use
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ * @return
+ */
public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
return new DeltaTrigger<>(threshold, deltaFunction);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index f693a67..cc3440c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -19,6 +19,10 @@ package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+/**
+ * A {@link Trigger} that fires once the current system time passes the end of the window
+ * to which a pane belongs.
+ */
public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
@@ -50,6 +54,9 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
return "ProcessingTimeTrigger()";
}
+ /**
+ * Creates a new trigger that fires once system time passes the end of the window.
+ */
public static ProcessingTimeTrigger create() {
return new ProcessingTimeTrigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 88e22cd..1c896a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -20,6 +20,16 @@ package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.windows.Window;
+/**
+ * A trigger that can turn any {@link Trigger} into a purging {@code Trigger}.
+ *
+ * <p>
+ * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
private static final long serialVersionUID = 1L;
@@ -65,6 +75,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
return "PurgingTrigger(" + nestedTrigger.toString() + ")";
}
+ /**
+ * Creates a new purging trigger from the given {@code Trigger}.
+ *
+ * @param nestedTrigger The trigger that is wrapped by this purging trigger
+ */
public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
return new PurgingTrigger<>(nestedTrigger);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index b04aacf..97d9ba5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -20,21 +20,81 @@ package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.streaming.api.windowing.windows.Window;
import scala.Serializable;
+/**
+ * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
+ * results for that part of the window.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Trigger}.
+ *
+ * @param <T> The type of elements on which this {@code Trigger} works.
+ * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
+ */
public interface Trigger<T, W extends Window> extends Serializable {
- public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+ /**
+ * Called for every element that gets added to a pane. The result of this will determine
+ * whether the pane is evaluated to emit results.
+ *
+ * @param element The element that arrived.
+ * @param timestamp The timestamp of the element that arrived.
+ * @param window The window to which this pane belongs.
+ * @param ctx A context object that can be used to register timer callbacks.
+ */
+ TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
- public TriggerResult onTime(long time, TriggerContext ctx);
+ /**
+ * Called when a timer that was set using the trigger context fires.
+ *
+ * @param time The timestamp at which the timer fired.
+ * @param ctx A context object that can be used to register timer callbacks.
+ */
+ TriggerResult onTime(long time, TriggerContext ctx);
- public Trigger<T, W> duplicate();
+ /**
+ * Creates a duplicate of the {@code Trigger} without the state of the original {@code Trigger}.
+ * @return The duplicate {@code Trigger} object.
+ */
+ Trigger<T, W> duplicate();
- public static enum TriggerResult {
+ /**
+ * Result type for trigger methods. This determines what happens which the window.
+ *
+ * <p>
+ * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
+ * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
+ * are purged. On {@code CONTINUE} nothing happens, processing continues.
+ */
+ enum TriggerResult {
CONTINUE, FIRE_AND_PURGE, FIRE
}
- public interface TriggerContext {
+ /**
+ * A context object that is given to {@code Trigger} methods to allow them to register timer
+ * callbacks.
+ */
+ interface TriggerContext {
+
+ /**
+ * Register a system time callback. When the current system time passes the specified
+ * time {@link #onTime(long, TriggerContext)} is called.
+ *
+ * @param time The time at which to invoke {@link #onTime(long, TriggerContext)}
+ */
void registerProcessingTimeTimer(long time);
+ /**
+ * Register a watermark callback. When the current watermark passes the specified
+ * time {@link #onTime(long, TriggerContext)} is called.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)}
+ */
void registerWatermarkTimer(long time);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
index 6ba8890..5d66ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -19,6 +19,12 @@ package org.apache.flink.streaming.api.windowing.triggers;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+/**
+ * A {@link Trigger} that fires once the watermark passes the end of the window
+ * to which a pane belongs.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ */
public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
@@ -50,6 +56,9 @@ public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
return "WatermarkTrigger()";
}
+ /**
+ * Creates trigger that fires once the watermark passes the end of the window.
+ */
public static WatermarkTrigger create() {
return new WatermarkTrigger();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
index 145ad25..017c8ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -33,6 +33,13 @@ import java.util.Map;
import java.util.Set;
+/**
+ * An operator that can sort a stream based on timestamps. Arriving elements will be put into
+ * buckets based on their timestamp. Sorting and emission of sorted elements happens once
+ * the watermark passes the end of a bucket.
+ *
+ * @param <T> The type of the elements on which this operator works.
+ */
public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
private static final long serialVersionUID = 1L;
@@ -40,8 +47,13 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
private transient Map<Long, List<StreamRecord<T>>> buckets;
- public BucketStreamSortOperator(long granularity) {
- this.granularity = granularity;
+ /**
+ * Creates a new sorting operator that creates buckets with the given interval.
+ *
+ * @param interval The size (in time) of one bucket.
+ */
+ public BucketStreamSortOperator(long interval) {
+ this.granularity = interval;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
index ac1a543..50d1cb6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
@@ -19,14 +19,13 @@
package org.apache.flink.streaming.runtime.operators;
/**
- * This interface must be implemented by objects that are triggered by a
- * {@link TriggerTimer}.
+ * This interface must be implemented by objects that are triggered by the timer service available
+ * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
*/
public interface Triggerable {
/**
- * This method is invoked by the {@link TriggerTimer}
- * and given the timestamp for which the trigger was scheduled.
+ * This method is invoked with the timestamp for which the trigger was scheduled.
* <p>
* If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due
* to a garbage collection), the timestamp supplied to this function will still be the original
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 53df838..31c7fed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -30,7 +30,15 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+/**
+ * Evicting window operator for non-keyed windows.
+ *
+ * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
+ *
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 334eb54..49d58e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -33,6 +33,18 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
+/**
+ * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
+ *
+ * <p>
+ * The {@code Evictor} is used to evict elements from panes before processing a window and after
+ * a {@link Trigger} has fired.
+ *
+ * @param <K> The type of key returned by the {@code KeySelector}.
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d48643d..a80242d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -45,6 +45,15 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
+/**
+ * Window operator for non-keyed windows.
+ *
+ * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
+ *
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
public class NonKeyedWindowOperator<IN, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2d4635f..548afb3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -46,6 +46,32 @@ import java.util.Collection;
import java.util.Map;
import java.util.Set;
+/**
+ * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
+ * {@link Trigger}.
+ *
+ * <p>
+ * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
+ * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
+ * is put into panes. A pane is the bucket of elements that have the same key and same
+ * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
+ * {@code WindowAssigner}.
+ *
+ * <p>
+ * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
+ * the contents of the pane should be processed to emit results. When a trigger fires,
+ * the given {@link WindowFunction} is invoked to produce the results that are emitted for
+ * the pane to which the {@code Trigger} belongs.
+ *
+ * <p>
+ * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
+ * elements of each pane.
+ *
+ * @param <K> The type of key returned by the {@code KeySelector}.
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
public class WindowOperator<K, IN, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
@@ -54,24 +80,47 @@ public class WindowOperator<K, IN, OUT, W extends Window>
private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
private final WindowAssigner<? super IN, W> windowAssigner;
+
private final KeySelector<IN, K> keySelector;
private final Trigger<? super IN, ? super W> triggerTemplate;
+
private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
+ /**
+ * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+ * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+ */
protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
+ /**
+ * Processing time timers that are currently in-flight.
+ */
private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+
+ /**
+ * Current waiting watermark callbacks.
+ */
private transient Map<Long, Set<TriggerContext>> watermarkTimers;
+ /**
+ * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+ */
protected transient TimestampedCollector<OUT> timestampedCollector;
+ /**
+ * If this is true. The current processing time is set as the timestamp of incoming elements.
+ * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+ * if eviction should happen based on processing time.
+ */
private boolean setProcessingTime = false;
private TypeSerializer<IN> inputSerializer;
+ /**
+ * Creates a new {@code WindowOperator} based on the given policies and user functions.
+ */
public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
KeySelector<IN, K> keySelector,
WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
@@ -245,6 +294,10 @@ public class WindowOperator<K, IN, OUT, W extends Window>
}
}
+ /**
+ * A context object that is given to {@code Trigger} functions to allow them to register
+ * timer/watermark callbacks.
+ */
protected class TriggerContext implements Trigger.TriggerContext {
Trigger<? super IN, ? super W> trigger;
K key;
@@ -312,9 +365,4 @@ public class WindowOperator<K, IN, OUT, W extends Window>
public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
return windowBufferFactory;
}
-
- @VisibleForTesting
- public boolean isSetProcessingTime() {
- return setProcessingTime;
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
index 50e392b..28365e1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -17,6 +17,19 @@
*/
package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+/**
+ * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
+ * the elements are added is preserved. Elements can only be evicted started from the beginning of
+ * the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+
public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
- public boolean removeElements(int count);
+
+ /**
+ * Removes the given number of elements, starting from the beginning.
+ * @param count The number of elements to remove.
+ */
+ void removeElements(int count);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
index 092718a..f9f8b26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
@@ -25,6 +25,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.ArrayDeque;
+/**
+ * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
private static final long serialVersionUID = 1L;
@@ -40,12 +45,11 @@ public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
}
@Override
- public boolean removeElements(int count) {
+ public void removeElements(int count) {
// TODO determine if this can be done in a better way
for (int i = 0; i < count; i++) {
elements.removeFirst();
}
- return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
index 85f90b0..37be8f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
@@ -25,6 +25,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.util.Collections;
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
+ * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+
public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
private static final long serialVersionUID = 1L;
@@ -85,7 +92,7 @@ public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
@Override
public PreAggregatingHeapWindowBuffer<T> create() {
- return new PreAggregatingHeapWindowBuffer<T>(reduceFunction);
+ return new PreAggregatingHeapWindowBuffer<>(reduceFunction);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
index 8c891d5..b111667 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -18,17 +18,47 @@
package org.apache.flink.streaming.runtime.operators.windowing.buffers;
+import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import java.io.Serializable;
+/**
+ * A {@code WindowBuffer} is used by
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
+ * the elements of one pane.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Evictor}.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
public interface WindowBuffer<T> extends Serializable {
- public void storeElement(StreamRecord<T> element) throws Exception;
+ /**
+ * Adds the element to the buffer.
+ *
+ * @param element The element to add.
+ */
+ void storeElement(StreamRecord<T> element) throws Exception;
- public Iterable<StreamRecord<T>> getElements();
+ /**
+ * Returns all elements that are currently in the buffer.
+ */
+ Iterable<StreamRecord<T>> getElements();
- public Iterable<T> getUnpackedElements();
+ /**
+ * Returns all elements that are currently in the buffer. This will unwrap the contained
+ * elements from their {@link StreamRecord}.
+ */
+ Iterable<T> getUnpackedElements();
- public int size();
+ /**
+ * Returns the number of elements that are currently in the buffer.
+ */
+ int size();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
index 4a7f6df..4bcdf09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -22,9 +22,33 @@ import org.apache.flink.configuration.Configuration;
import java.io.Serializable;
+/**
+ * A factory for {@link WindowBuffer WindowBuffers}.
+ *
+ * @param <T> The type of elements that the created {@code WindowBuffer} can store.
+ * @param <B> The type of the created {@code WindowBuffer}
+ */
public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
+
+ /**
+ * Sets the {@link RuntimeContext} that is used to initialize eventual user functions
+ * inside the created buffers.
+ */
void setRuntimeContext(RuntimeContext ctx);
+
+ /**
+ * Calls {@code open()} on eventual user functions inside the buffer.
+ */
void open(Configuration config) throws Exception;
+
+ /**
+ * Calls {@code close()} on eventual user functions inside the buffer.
+ */
+
void close() throws Exception;
+
+ /**
+ * Creates a new {@code WindowBuffer}.
+ */
B create();
}