You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@edgent.apache.org by dl...@apache.org on 2016/07/14 17:54:15 UTC
[2/4] incubator-quarks git commit: add behavior clarifications as
affected by window type
add behavior clarifications as affected by window type
Project: http://git-wip-us.apache.org/repos/asf/incubator-quarks/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-quarks/commit/3e8a0d97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-quarks/tree/3e8a0d97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-quarks/diff/3e8a0d97
Branch: refs/heads/master
Commit: 3e8a0d9791c0e176398d4cea7a26ceb921de5503
Parents: 0b27016
Author: Dale LaBossiere <dl...@us.ibm.com>
Authored: Thu Jul 14 12:55:30 2016 -0400
Committer: Dale LaBossiere <dl...@us.ibm.com>
Committed: Thu Jul 14 12:55:30 2016 -0400
----------------------------------------------------------------------
.../src/main/java/quarks/topology/TWindow.java | 65 ++++++++++++--------
1 file changed, 39 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-quarks/blob/3e8a0d97/api/topology/src/main/java/quarks/topology/TWindow.java
----------------------------------------------------------------------
diff --git a/api/topology/src/main/java/quarks/topology/TWindow.java b/api/topology/src/main/java/quarks/topology/TWindow.java
index 62deab6..bfb392a 100644
--- a/api/topology/src/main/java/quarks/topology/TWindow.java
+++ b/api/topology/src/main/java/quarks/topology/TWindow.java
@@ -45,21 +45,27 @@ import quarks.function.Function;
*/
public interface TWindow<T, K> extends TopologyElement {
/**
- * Declares a stream that is a continuous aggregation of
- * partitions in this window. Each time the contents of a partition is updated by a new
- * tuple being added to it, or tuples being evicted
- * {@code aggregator.apply(tuples, key)} is called, where {@code tuples} is an
- * {@code List} that containing all the tuples in the partition.
- * The {@code List} is stable during the method call, and returns the
- * tuples in order of insertion into the window, from oldest to newest.
- * The list will be empty if the last tuple in the partition has been evicted.
- * <BR>
- * The returned stream will contain a tuple that is the result of
- * {@code aggregator.apply(tuples, key)} when the return is not {@code null}.
- * If {@code aggregator.apply(tuples, key)} returns {@code null} then
- * no tuple is submitted to the returned stream.
- * <BR>
- * Thus the returned stream will contain a sequence of tuples that where the
+ * Declares a stream that is a continuous, sliding, aggregation of
+ * partitions in this window.
+ * <P>
+ * Changes in a partition's contents trigger an invocation of
+ * {@code aggregator.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest:
+ * <UL>
+ * <LI>Count-based window: the aggregator is called after each
+ * tuple added to a partition. When an addition results in a tuple
+ * being evicted, the eviction occurs before the aggregator is called.</LI>
+ * <LI>Time-based window: the aggregator is called after each tuple
+ * added to a partition. The aggregator is also called
+ * each time one or more tuples are evicted from a partition
+ * (multiple tuples may be evicted at once). The list will be
+ * empty if the eviction results in an empty partition.</LI>
+ * </UL>
+ * A non-null {@code aggregator} result is added to the returned stream.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
* most recent tuple represents the most up to date aggregation of a
* partition.
*
@@ -72,17 +78,24 @@ public interface TWindow<T, K> extends TopologyElement {
/**
* Declares a stream that represents a batched aggregation of
- * partitions in this window. Each time the contents of a partition equals
- * the window size or the time duration,
- * {@code batcher.apply(tuples, key)} is called, where {@code tuples} is an
- * {@code List} that containing all the tuples in the partition.
- * The {@code List} is stable during the method call, and returns the
- * tuples in order of insertion into the window, from oldest to newest. <BR>
- * Thus the returned stream will contain a sequence of tuples that where
- * each tuple represents the output of the most recent batch of a partition.
- * The tuples contained in a partition during a batch do not overlap with
- * the tuples in any subsequent batch. After a partition is batched, its
- * contents are cleared.
+ * partitions in this window.
+ * <P>
+ * Each partition "batch" triggers an invocation of
+ * {@code batcher.apply(tuples, key)}, where {@code tuples} is
+ * a {@code List<T>} containing all the tuples in the partition in
+ * insertion order from oldest to newest:
+ * <UL>
+ * <LI>Count-based window: a batch occurs when the partition is full.</LI>
+ * <LI>Time-based window: a batch occurs every "time" period units. The
+ * list will be empty if no tuples have been received during the period.</LI>
+ * </UL>
+ * A non-null {@code batcher} result is added to the returned stream.
+ * The partition's contents are cleared after a batch is processed.
+ * </P>
+ * <P>
+ * Thus the returned stream will contain a sequence of tuples where the
+ * most recent tuple represents the most up to date aggregation of a
+ * partition.
*
* @param <U> Tuple type
* @param batcher