You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:41 UTC

[4/6] flink git commit: [FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role

[FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/feeb9d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/feeb9d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/feeb9d7b

Branch: refs/heads/release-1.2
Commit: feeb9d7bb7afb56749be501afb79f14617c455d7
Parents: c291fe6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 11:24:01 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/WindowedStream.java          |  4 +-
 .../SlidingAlignedProcessingTimeWindows.java    | 41 +++++++++++++-------
 .../TumblingAlignedProcessingTimeWindows.java   | 37 ++++++++++++------
 3 files changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index cd8385b..51712e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
 			TypeInformation<R> resultType,
 			String functionName) {
 
-		if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+		if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
 						windowLength, windowSlide);
 				return input.transform(opName, resultType, op);
 			}
-		} else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+		} else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
 			TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
 			final long windowLength = timeWindows.getSize();
 			final long windowSlide = timeWindows.getSize();

http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index a0e0bcb..984c31a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 /**
- * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
- *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
- *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
+ * 
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly 
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
+ * 
+ * <ul>
+ *     <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ *     <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ *     <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ *         operator.</li>
+ * </ul>
+ * 
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ * 
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
 public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = 3695562702662473688L;

http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index e1a8101..c00eb7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 /**
- * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
  *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly 
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
  *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * <ul>
+ *     <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ *     <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ *     <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ *         operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
 public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
 
 	private static final long serialVersionUID = -6217477609512299842L;