You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2017/01/24 18:19:55 UTC
[2/6] flink git commit: [FLINK-5247] [streaming api] Fix checks for
allowed lateness in windowed streams
[FLINK-5247] [streaming api] Fix checks for allowed lateness in windowed streams
Also, fix outdated documentation.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/87af8419
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/87af8419
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/87af8419
Branch: refs/heads/master
Commit: 87af84194911eb1e0c3b3a894bb3f04b628fbf11
Parents: acfeeaf
Author: Rohit Agarwal <mi...@gmail.com>
Authored: Sat Dec 3 12:15:45 2016 -0800
Committer: Stephan Ewen <se...@apache.org>
Committed: Tue Jan 24 17:20:11 2017 +0100
----------------------------------------------------------------------
.../flink/streaming/api/datastream/AllWindowedStream.java | 6 ++----
.../apache/flink/streaming/api/datastream/WindowedStream.java | 6 ++----
.../apache/flink/streaming/api/scala/AllWindowedStream.scala | 2 +-
.../org/apache/flink/streaming/api/scala/WindowedStream.scala | 2 +-
4 files changed, 6 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 31ea001..bd11de3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -123,11 +123,9 @@ public class AllWindowedStream<T, W extends Window> {
@PublicEvolving
public AllWindowedStream<T, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
- if (allowedLateness < 0) {
+ if (millis < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
- throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
- } else {
+ } else if (windowAssigner.isEventTime()) {
this.allowedLateness = millis;
}
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 0a02885..c360ea1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -146,11 +146,9 @@ public class WindowedStream<T, K, W extends Window> {
@PublicEvolving
public WindowedStream<T, K, W> allowedLateness(Time lateness) {
long millis = lateness.toMilliseconds();
- if (allowedLateness < 0) {
+ if (millis < 0) {
throw new IllegalArgumentException("The allowed lateness cannot be negative.");
- } else if (allowedLateness != 0 && !windowAssigner.isEventTime()) {
- throw new IllegalArgumentException("Setting the allowed lateness is only valid for event-time windows.");
- } else {
+ } else if (windowAssigner.isEventTime()) {
this.allowedLateness = millis;
}
return this;
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
index 83104e8..324689a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala
@@ -58,7 +58,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
/**
* Sets the allowed lateness to a user-specified value.
- * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+ * If not explicitly set, the allowed lateness is [[0L]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],
http://git-wip-us.apache.org/repos/asf/flink/blob/87af8419/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
index 76d9cda..db187ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala
@@ -61,7 +61,7 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) {
/**
* Sets the allowed lateness to a user-specified value.
- * If not explicitly set, the allowed lateness is [[Long.MaxValue]].
+ * If not explicitly set, the allowed lateness is [[0L]].
* Setting the allowed lateness is only valid for event-time windows.
* If a value different than 0 is provided with a processing-time
* [[org.apache.flink.streaming.api.windowing.assigners.WindowAssigner]],