You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:55 UTC

[2/6] flink git commit: [FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams

[FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams

Also, fix outdated documentation.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87af8419
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87af8419
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87af8419

Branch: refs/heads/master
Commit: 87af84194911eb1e0c3b3a894bb3f04b628fbf11
Parents: acfeeaf
Author: Rohit Agarwal <mi...@gmail.com>
Authored: Sat Dec 3 12:15:45 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/AllWindowedStream.java      | 6 ++----
 .../apache/flink/streaming/api/datastream/WindowedStream.java  | 6 ++----
 .../apache/flink/streaming/api/scala/AllWindowedStream.scala   | 2 +-
 .../org/apache/flink/streaming/api/scala/WindowedStream.scala  | 2 +-
 4 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31ea001..bd11de3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -123,11 +123,9 @@ public class AllWindowedStream<T, W extends Window> {
 	@PublicEvolving
 	public AllWindowedStream<T, W> allowedLateness(Time lateness) {
 		long millis = lateness.toMilliseconds();
-		if (allowedLateness < 0) {
+		if (millis < 0) {
 			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
-			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
-		} else {
+		} else if (windowAssigner.isEventTime()) {
 			this.allowedLateness = millis;
 		}
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0a02885..c360ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -146,11 +146,9 @@ public class WindowedStream<T, K, W extends Window> {
 	@PublicEvolving
 	public WindowedStream<T, K, W> allowedLateness(Time lateness) {
 		long millis = lateness.toMilliseconds();
-		if (allowedLateness < 0) {
+		if (millis < 0) {
 			throw new IllegalArgumentException("The allowed lateness cannot be negative.");
-		} else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
-			throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
-		} else {
+		} else if (windowAssigner.isEventTime()) {
 			this.allowedLateness = millis;
 		}
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 83104e8..324689a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -58,7 +58,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
 
   /**
     * Sets the allowed lateness to a user-specified value.
-    * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+    * If not explicitly set, the allowed lateness is [[0L]].
     * Setting the allowed lateness is only valid for event-time windows.
     * If a value different than 0 is provided with a processing-time
     * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],

http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 76d9cda..db187ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -61,7 +61,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
 
   /**
     * Sets the allowed lateness to a user-specified value.
-    * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+    * If not explicitly set, the allowed lateness is [[0L]].
     * Setting the allowed lateness is only valid for event-time windows.
     * If a value different than 0 is provided with a processing-time
     * [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],