You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/05 16:42:45 UTC
[11/13] flink git commit: [FLINK-2550] Change Window API constructs
to use Time instead of long
[FLINK-2550] Change Window API constructs to use Time instead of long
This covers assigners/triggers/evictors.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c2c7694
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c2c7694
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c2c7694
Branch: refs/heads/master
Commit: 8c2c76947a9a26e25d4539068b5253b265c71c23
Parents: 833b347
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Oct 3 16:39:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200
----------------------------------------------------------------------
.../streaming/api/datastream/DataStream.java | 10 +++---
.../streaming/api/datastream/KeyedStream.java | 8 ++---
.../assigners/SlidingProcessingTimeWindows.java | 5 +--
.../windowing/assigners/SlidingTimeWindows.java | 5 +--
.../TumblingProcessingTimeWindows.java | 5 +--
.../assigners/TumblingTimeWindows.java | 5 +--
.../api/windowing/evictors/TimeEvictor.java | 5 +--
.../api/windowing/time/AbstractTime.java | 3 ++
.../ContinuousProcessingTimeTrigger.java | 31 +++++++++---------
.../triggers/ContinuousWatermarkTrigger.java | 25 ++++++++-------
.../windowing/AllWindowTranslationTest.java | 17 +++++-----
.../windowing/NonKeyedWindowOperatorTest.java | 25 ++++++---------
.../operators/windowing/WindowOperatorTest.java | 16 +++++-----
.../windowing/WindowTranslationTest.java | 18 ++++++-----
.../flink/streaming/api/scala/DataStream.scala | 12 +++----
.../flink/streaming/api/scala/KeyedStream.scala | 12 +++----
.../api/scala/AllWindowTranslationTest.scala | 33 +++++++++++++-------
.../api/scala/WindowTranslationTest.scala | 24 +++++++++-----
18 files changed, 144 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 6d88416..32d9012 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -741,9 +741,9 @@ public class DataStream<T> {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
if (actualSize instanceof EventTime) {
- return windowAll(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+ return windowAll(TumblingTimeWindows.of(actualSize));
} else {
- return windowAll(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+ return windowAll(TumblingProcessingTimeWindows.of(actualSize));
}
}
@@ -763,11 +763,9 @@ public class DataStream<T> {
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
if (actualSize instanceof EventTime) {
- return windowAll(SlidingTimeWindows.of(actualSize.toMilliseconds(),
- actualSlide.toMilliseconds()));
+ return windowAll(SlidingTimeWindows.of(size, slide));
} else {
- return windowAll(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(),
- actualSlide.toMilliseconds()));
+ return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 265886b..f7c5b53 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -125,9 +125,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
if (actualSize instanceof EventTime) {
- return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
+ return window(TumblingTimeWindows.of(actualSize));
} else {
- return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
+ return window(TumblingProcessingTimeWindows.of(actualSize));
}
}
@@ -147,9 +147,9 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
if (actualSize instanceof EventTime) {
- return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ return window(SlidingTimeWindows.of(actualSize, actualSlide));
} else {
- return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
+ return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index a2d95c2..6fc79b0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing.assigners;
import com.google.common.collect.Lists;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -86,7 +87,7 @@ public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWin
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
- public static SlidingProcessingTimeWindows of(long size, long slide) {
- return new SlidingProcessingTimeWindows(size, slide);
+ public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) {
+ return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index cb5a7a1..49bff05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -75,7 +76,7 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
* @param slide The slide interval of the generated windows.
* @return The time policy.
*/
- public static SlidingTimeWindows of(long size, long slide) {
- return new SlidingTimeWindows(size, slide);
+ public static SlidingTimeWindows of(AbstractTime size, AbstractTime slide) {
+ return new SlidingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index b1ef857..1f2eebf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -61,7 +62,7 @@ public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWi
* @param size The size of the generated windows.
* @return The time policy.
*/
- public static TumblingProcessingTimeWindows of(long size) {
- return new TumblingProcessingTimeWindows(size);
+ public static TumblingProcessingTimeWindows of(AbstractTime size) {
+ return new TumblingProcessingTimeWindows(size.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index d19c97d..019f45b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -60,8 +61,8 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
* @param size The size of the generated windows.
* @return The time policy.
*/
- public static TumblingTimeWindows of(long size) {
- return new TumblingTimeWindows(size);
+ public static TumblingTimeWindows of(AbstractTime size) {
+ return new TumblingTimeWindows(size.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index c38100c..2965214 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.windowing.evictors;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -54,7 +55,7 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
return windowSize;
}
- public static <W extends Window> TimeEvictor<W> of(long windowSize) {
- return new TimeEvictor<>(windowSize);
+ public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
+ return new TimeEvictor<>(windowSize.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
index 1264c2a..3f8fb60 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/time/AbstractTime.java
@@ -24,6 +24,9 @@ import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
+/**
+ * Base class for {@link Time} implementations.
+ */
public abstract class AbstractTime {
/** The time unit for this policy's time interval */
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index da198be..24e8ce3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -18,32 +18,33 @@
package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;
- private long granularity;
+ private long interval;
private long nextFireTimestamp = 0;
- private ContinuousProcessingTimeTrigger(long granularity) {
- this.granularity = granularity;
+ private ContinuousProcessingTimeTrigger(long interval) {
+ this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
long currentTime = System.currentTimeMillis();
if (nextFireTimestamp == 0) {
- long start = currentTime - (currentTime % granularity);
- nextFireTimestamp = start + granularity;
+ long start = currentTime - (currentTime % interval);
+ nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
if (currentTime > nextFireTimestamp) {
- long start = currentTime - (currentTime % granularity);
- nextFireTimestamp = start + granularity;
+ long start = currentTime - (currentTime % interval);
+ nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
@@ -57,8 +58,8 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
// only fire if an element didn't already fire
long currentTime = System.currentTimeMillis();
if (currentTime > nextFireTimestamp) {
- long start = currentTime - (currentTime % granularity);
- nextFireTimestamp = start + granularity;
+ long start = currentTime - (currentTime % interval);
+ nextFireTimestamp = start + interval;
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
@@ -66,20 +67,20 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
@Override
public Trigger<Object, W> duplicate() {
- return new ContinuousProcessingTimeTrigger<>(granularity);
+ return new ContinuousProcessingTimeTrigger<>(interval);
}
@VisibleForTesting
- public long getGranularity() {
- return granularity;
+ public long getInterval() {
+ return interval;
}
@Override
public String toString() {
- return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+ return "ContinuousProcessingTimeTrigger(" + interval + ")";
}
- public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(long granularity) {
- return new ContinuousProcessingTimeTrigger<>(granularity);
+ public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
+ return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index 3b6dc6d..e11ceba 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -18,24 +18,25 @@
package org.apache.flink.streaming.api.windowing.triggers;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.windows.Window;
public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
private static final long serialVersionUID = 1L;
- private long granularity;
+ private long interval;
private boolean first = true;
- private ContinuousWatermarkTrigger(long granularity) {
- this.granularity = granularity;
+ private ContinuousWatermarkTrigger(long interval) {
+ this.interval = interval;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) {
if (first) {
- long start = timestamp - (timestamp % granularity);
- long nextFireTimestamp = start + granularity;
+ long start = timestamp - (timestamp % interval);
+ long nextFireTimestamp = start + interval;
ctx.registerWatermarkTimer(nextFireTimestamp);
first = false;
@@ -46,26 +47,26 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
@Override
public TriggerResult onTime(long time, TriggerContext ctx) {
- ctx.registerWatermarkTimer(time + granularity);
+ ctx.registerWatermarkTimer(time + interval);
return TriggerResult.FIRE;
}
@Override
public Trigger<Object, W> duplicate() {
- return new ContinuousWatermarkTrigger<>(granularity);
+ return new ContinuousWatermarkTrigger<>(interval);
}
@Override
public String toString() {
- return "ContinuousProcessingTimeTrigger(" + granularity + ")";
+ return "ContinuousProcessingTimeTrigger(" + interval + ")";
}
@VisibleForTesting
- public long getGranularity() {
- return granularity;
+ public long getInterval() {
+ return interval;
}
- public static <W extends Window> ContinuousWatermarkTrigger<W> of(long granularity) {
- return new ContinuousWatermarkTrigger<>(granularity);
+ public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
+ return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index ee8c6d6..767b40c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -39,6 +40,8 @@ import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
/**
* These tests verify that the api calls on
* {@link org.apache.flink.streaming.api.datastream.AllWindowedStream} instantiate
@@ -62,7 +65,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduceWindow(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -70,7 +73,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -98,7 +101,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduceWindow(reducer);
@@ -111,7 +114,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -144,7 +147,7 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduceWindow(reducer);
@@ -158,9 +161,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(100))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
index 9b0bcc4..6cc8931 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperatorTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousWatermarkTrigger;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -49,6 +50,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
@@ -69,11 +71,11 @@ public class NonKeyedWindowOperatorTest {
public void testSlidingEventTimeWindows() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
- final int WINDOW_SLIDE = 1000;
+ final int WINDOW_SIZE = 3;
+ final int WINDOW_SLIDE = 1;
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
- SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
+ SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
windowBufferFactory,
new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -150,10 +152,10 @@ public class NonKeyedWindowOperatorTest {
public void testTumblingEventTimeWindows() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
+ final int WINDOW_SIZE = 3;
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new NonKeyedWindowOperator<>(
- TumblingTimeWindows.of(WINDOW_SIZE),
+ TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
windowBufferFactory,
new ReduceAllWindowFunction<TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
WatermarkTrigger.create());
@@ -228,13 +230,13 @@ public class NonKeyedWindowOperatorTest {
public void testContinuousWatermarkTrigger() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
+ final int WINDOW_SIZE = 3;
NonKeyedWindowOperator<Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new NonKeyedWindowOperator<>(
GlobalWindows.create(),
windowBufferFactory,
new ReduceAllWindowFunction<GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
- ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+ ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
@@ -421,13 +423,4 @@ public class NonKeyedWindowOperatorTest {
}
}
}
-
- private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple2<String, Integer> value) throws Exception {
- return value.f0;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 1bfd1d5..d387df0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
@@ -49,6 +50,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@RunWith(Parameterized.class)
@@ -69,11 +71,11 @@ public class WindowOperatorTest {
public void testSlidingEventTimeWindows() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
- final int WINDOW_SLIDE = 1000;
+ final int WINDOW_SIZE = 3;
+ final int WINDOW_SLIDE = 1;
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- SlidingTimeWindows.of(WINDOW_SIZE, WINDOW_SLIDE),
+ SlidingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)),
new TupleKeySelector(),
windowBufferFactory,
new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
@@ -157,10 +159,10 @@ public class WindowOperatorTest {
public void testTumblingEventTimeWindows() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
+ final int WINDOW_SIZE = 3;
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow> operator = new WindowOperator<>(
- TumblingTimeWindows.of(WINDOW_SIZE),
+ TumblingTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)),
new TupleKeySelector(),
windowBufferFactory,
new ReduceWindowFunction<String, TimeWindow, Tuple2<String, Integer>>(new SumReducer()),
@@ -240,14 +242,14 @@ public class WindowOperatorTest {
public void testContinuousWatermarkTrigger() throws Exception {
closeCalled.set(0);
- final int WINDOW_SIZE = 3000;
+ final int WINDOW_SIZE = 3;
WindowOperator<String, Tuple2<String, Integer>, Tuple2<String, Integer>, GlobalWindow> operator = new WindowOperator<>(
GlobalWindows.create(),
new TupleKeySelector(),
windowBufferFactory,
new ReduceWindowFunction<String, GlobalWindow, Tuple2<String, Integer>>(new SumReducer()),
- ContinuousWatermarkTrigger.of(WINDOW_SIZE));
+ ContinuousWatermarkTrigger.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS)));
operator.setInputType(TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"), new ExecutionConfig());
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index a3e6085..9dc6687 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeW
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -40,6 +41,8 @@ import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
+import java.util.concurrent.TimeUnit;
+
/**
* These tests verify that the api calls on
* {@link WindowedStream} instantiate
@@ -61,7 +64,8 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.reduceWindow(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -70,7 +74,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -99,7 +103,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduceWindow(reducer);
@@ -113,7 +117,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(1000))
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -147,7 +151,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduceWindow(reducer);
@@ -162,9 +166,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(1000))
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
- .evictor(TimeEvictor.of(100))
+ .evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7dfaeef..6ad7629 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -629,11 +629,11 @@ class DataStream[T](javaStream: JavaStream[T]) {
actualSize match {
case t: EventTime =>
- val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+ val assigner = TumblingTimeWindows.of(actualSize)
.asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
case t: ProcessingTime =>
- val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+ val assigner = TumblingProcessingTimeWindows.of(actualSize)
.asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
case _ => throw new RuntimeException("Invalid time: " + actualSize)
@@ -658,13 +658,13 @@ class DataStream[T](javaStream: JavaStream[T]) {
actualSize match {
case t: EventTime =>
val assigner = SlidingTimeWindows.of(
- actualSize.toMilliseconds,
- actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ actualSize,
+ actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
case t: ProcessingTime =>
val assigner = SlidingProcessingTimeWindows.of(
- actualSize.toMilliseconds,
- actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ actualSize,
+ actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
windowAll(assigner)
case _ => throw new RuntimeException("Invalid time: " + actualSize)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 232e4bb..18b71be 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -55,11 +55,11 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
actualSize match {
case t: EventTime =>
- val assigner = TumblingTimeWindows.of(actualSize.toMilliseconds)
+ val assigner = TumblingTimeWindows.of(actualSize)
.asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
case t: ProcessingTime =>
- val assigner = TumblingProcessingTimeWindows.of(actualSize.toMilliseconds)
+ val assigner = TumblingProcessingTimeWindows.of(actualSize)
.asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
case _ => throw new RuntimeException("Invalid time: " + actualSize)
@@ -85,13 +85,13 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
actualSize match {
case t: EventTime =>
val assigner = SlidingTimeWindows.of(
- actualSize.toMilliseconds,
- actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ actualSize,
+ actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
case t: ProcessingTime =>
val assigner = SlidingProcessingTimeWindows.of(
- actualSize.toMilliseconds,
- actualSlide.toMilliseconds).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ actualSize,
+ actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
window(assigner)
case _ => throw new RuntimeException("Invalid time: " + actualSize)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index 35c7fcc..247256f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -19,13 +19,15 @@
package org.apache.flink.streaming.api.scala
+import java.util.concurrent.TimeUnit
+
import org.apache.flink.api.common.functions.RichReduceFunction
-import org.apache.flink.api.java.tuple.Tuple
-import org.apache.flink.streaming.api.functions.windowing.{AllWindowFunction, WindowFunction}
+import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
-import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, WatermarkTrigger, CountTrigger}
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
import org.apache.flink.streaming.runtime.operators.windowing._
@@ -53,7 +55,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.reduceWindow(reducer)
val transform1 = window1.getJavaStream.getTransformation
@@ -65,10 +69,11 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
- .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
+ .windowAll(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
+ .apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
def apply(
- tuple: Tuple,
window: TimeWindow,
values: java.lang.Iterable[(String, Int)],
out: Collector[(String, Int)]) { }
@@ -91,7 +96,9 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
+ .windowAll(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduceWindow(reducer)
@@ -109,7 +116,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
def apply(
@@ -139,8 +146,10 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(1000, 100))
- .evictor(TimeEvictor.of(1000))
+ .windowAll(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
.reduceWindow(reducer)
val transform1 = window1.getJavaStream.getTransformation
@@ -157,7 +166,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(1000))
+ .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
http://git-wip-us.apache.org/repos/asf/flink/blob/8c2c7694/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index 49d0a1a..f1b05c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -18,12 +18,14 @@
package org.apache.flink.streaming.api.scala
+import java.util.concurrent.TimeUnit
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.runtime.operators.windowing.buffers.{HeapWindowBuffer, PreAggregatingHeapWindowBuffer}
@@ -50,7 +52,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.reduceWindow(reducer)
val transform1 = window1.getJavaStream.getTransformation
@@ -62,7 +66,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
key: Tuple,
@@ -89,7 +95,9 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
+ .window(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduceWindow(reducer)
@@ -108,7 +116,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(1000))
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
@@ -140,8 +148,10 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(1000, 100))
- .evictor(TimeEvictor.of(1000))
+ .window(SlidingProcessingTimeWindows.of(
+ Time.of(1, TimeUnit.SECONDS),
+ Time.of(100, TimeUnit.MILLISECONDS)))
+ .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
.reduceWindow(reducer)
val transform1 = window1.getJavaStream.getTransformation
@@ -159,7 +169,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(1000))
+ .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {