You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/23 12:58:41 UTC
[4/6] flink git commit: [FLINK-5532] [streaming api] Improve JavaDocs
for assigners for Fast Aligned Windows to clarify role
[FLINK-5532] [streaming api] Improve JavaDocs for assigners for Fast Aligned Windows to clarify role
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/feeb9d7b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/feeb9d7b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/feeb9d7b
Branch: refs/heads/release-1.2
Commit: feeb9d7bb7afb56749be501afb79f14617c455d7
Parents: c291fe6
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Jan 23 11:24:01 2017 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Jan 23 13:56:06 2017 +0100
----------------------------------------------------------------------
.../api/datastream/WindowedStream.java | 4 +-
.../SlidingAlignedProcessingTimeWindows.java | 41 +++++++++++++-------
.../TumblingAlignedProcessingTimeWindows.java | 37 ++++++++++++------
3 files changed, 54 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index cd8385b..51712e1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -1015,7 +1015,7 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType,
String functionName) {
- if (windowAssigner.getClass().equals(SlidingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+ if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSlide();
@@ -1046,7 +1046,7 @@ public class WindowedStream<T, K, W extends Window> {
windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
- } else if (windowAssigner.getClass().equals(TumblingAlignedProcessingTimeWindows.class) && trigger == null && evictor == null) {
+ } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) {
TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSize();
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
index a0e0bcb..984c31a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
- * A processing time sliding {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
- *
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
- *
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
+ *
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
+ *
+ * <ul>
+ * <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ * <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ * <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ * operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
public final class SlidingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = 3695562702662473688L;
http://git-wip-us.apache.org/repos/asf/flink/blob/feeb9d7b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
index e1a8101..c00eb7d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingAlignedProcessingTimeWindows.java
@@ -18,23 +18,36 @@
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
- * A processing time tumbling {@link WindowAssigner window assigner} used to perform windowing using the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator
- * AccumulatingProcessingTimeWindowOperator} and the
- * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator
- * AggregatingProcessingTimeWindowOperator}.
+ * This is a special window assigner used to tell the system to use the
+ * <i>"Fast Aligned Processing Time Window Operator"</i> for windowing.
*
- * <p>
- * With this assigner, the {@code trigger} used is a
- * {@link org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger
- * ProcessingTimeTrigger} and no {@code evictor} can be specified.
+ * <p>Prior Flink versions used that operator automatically for simple processing time
+ * windows (tumbling and sliding) when no custom trigger and no evictor was specified.
+ * In the current Flink version, that operator is only used when programs explicitly
+ * specify this window assigner. This is only intended for special cases where programs relied on
+ * the better performance of the fast aligned window operator, and are willing to accept the lack
+ * of support for various features as indicated below:
*
- * <p>
- * <b>WARNING:</b> Bear in mind that no rescaling and no backwards compatibility is supported.
- * */
+ * <ul>
+ * <li>No custom state backend can be selected, the operator always stores data on the Java heap.</li>
+ * <li>The operator does not support key groups, meaning it cannot change the parallelism.</li>
+ * <li>Future versions of Flink may not be able to resume from checkpoints/savepoints taken by this
+ * operator.</li>
+ * </ul>
+ *
+ * <p>Future implementation plans: We plan to add some of the optimizations used by this operator to
+ * the general window operator, so that future versions of Flink will not have the performance/functionality
+ * trade-off any more.
+ *
+ * <p>Note on implementation: The concrete operator instantiated by this assigner is either the
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator}
+ * or {@link org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator}.
+ */
+@PublicEvolving
public final class TumblingAlignedProcessingTimeWindows extends BaseAlignedWindowAssigner {
private static final long serialVersionUID = -6217477609512299842L;