You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:52 UTC
[3/8] flink git commit: [FLINK-2550] Rework interplay of Window
Assigners and TimeCharacteristic
[FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff367d6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff367d6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff367d6e
Branch: refs/heads/master
Commit: ff367d6ea728a7c5bc334f34591a4d79e573972f
Parents: 28a38bb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue Oct 6 18:19:56 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:08:25 2015 +0200
----------------------------------------------------------------------
.../api/datastream/AllWindowedStream.java | 17 +--
.../streaming/api/datastream/DataStream.java | 20 +---
.../streaming/api/datastream/KeyedStream.java | 20 +---
.../api/datastream/WindowedStream.java | 29 ++---
.../environment/StreamExecutionEnvironment.java | 12 +++
.../api/windowing/assigners/GlobalWindows.java | 3 +-
.../assigners/SlidingProcessingTimeWindows.java | 106 -------------------
.../windowing/assigners/SlidingTimeWindows.java | 11 +-
.../TumblingProcessingTimeWindows.java | 81 --------------
.../assigners/TumblingTimeWindows.java | 11 +-
.../api/windowing/assigners/WindowAssigner.java | 3 +-
.../operators/windowing/WindowOperator.java | 5 +
.../flink/streaming/api/CoGroupJoinITCase.java | 6 +-
.../windowing/AllWindowTranslationTest.java | 60 +++++++----
.../windowing/WindowTranslationTest.java | 89 +++++++++++++---
.../streaming/examples/join/WindowJoin.java | 3 +-
.../scala/examples/join/WindowJoin.scala | 3 +-
.../flink/streaming/api/scala/DataStream.scala | 37 +------
.../flink/streaming/api/scala/KeyedStream.scala | 37 +------
.../api/scala/StreamExecutionEnvironment.scala | 5 +
.../api/scala/AllWindowTranslationTest.scala | 24 +++--
.../streaming/api/scala/CoGroupJoinITCase.scala | 9 +-
.../api/scala/WindowTranslationTest.scala | 22 ++--
23 files changed, 233 insertions(+), 380 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 89c4857..a8d7654 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -80,7 +81,7 @@ public class AllWindowedStream<T, W extends Window> {
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger();
+ this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
/**
@@ -139,12 +140,14 @@ public class AllWindowedStream<T, W extends Window> {
OneInputStreamOperator<T, T> operator;
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
new ReduceAllWindowFunction<W, T>(function),
trigger,
- evictor);
+ evictor).enableSetProcessingTime(setProcessingTime);
} else {
// we need to copy because we need our own instance of the pre aggregator
@@ -154,7 +157,7 @@ public class AllWindowedStream<T, W extends Window> {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
new ReduceAllWindowFunction<W, T>(function),
- trigger);
+ trigger).enableSetProcessingTime(setProcessingTime);
}
return input.transform(opName, input.getType(), operator).setParallelism(1);
@@ -205,20 +208,22 @@ public class AllWindowedStream<T, W extends Window> {
String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
- OneInputStreamOperator<T, R> operator;
+ NonKeyedWindowOperator<T, R, W> operator;
+
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingNonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
function,
trigger,
- evictor);
+ evictor).enableSetProcessingTime(setProcessingTime);
} else {
operator = new NonKeyedWindowOperator<>(windowAssigner,
new HeapWindowBuffer.Factory<T>(),
function,
- trigger);
+ trigger).enableSetProcessingTime(setProcessingTime);
}
return input.transform(opName, resultType, operator).setParallelism(1);
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 0be1d56..ee8b3d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -59,9 +59,7 @@ import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -72,7 +70,6 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.ExtractTimestampsOperator;
@@ -726,13 +723,7 @@ public class DataStream<T> {
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return windowAll(TumblingTimeWindows.of(actualSize));
- } else {
- return windowAll(TumblingProcessingTimeWindows.of(actualSize));
- }
+ return windowAll(TumblingTimeWindows.of(size));
}
/**
@@ -747,14 +738,7 @@ public class DataStream<T> {
* @param size The size of the window.
*/
public AllWindowedStream<T, TimeWindow> timeWindowAll(AbstractTime size, AbstractTime slide) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
- AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return windowAll(SlidingTimeWindows.of(size, slide));
- } else {
- return windowAll(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
- }
+ return windowAll(SlidingTimeWindows.of(size, slide));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index edb7981..2e6d7d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,13 +32,10 @@ import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.time.EventTime;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
@@ -122,13 +119,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param size The size of the window.
*/
public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return window(TumblingTimeWindows.of(actualSize));
- } else {
- return window(TumblingProcessingTimeWindows.of(actualSize));
- }
+ return window(TumblingTimeWindows.of(size));
}
/**
@@ -143,14 +134,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
* @param size The size of the window.
*/
public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
- AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
- AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
-
- if (actualSize instanceof EventTime) {
- return window(SlidingTimeWindows.of(actualSize, actualSlide));
- } else {
- return window(SlidingProcessingTimeWindows.of(actualSize, actualSlide));
- }
+ return window(SlidingTimeWindows.of(size, slide));
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1273b42..99f7d06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
@@ -32,8 +33,8 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ReduceWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
@@ -91,7 +92,7 @@ public class WindowedStream<T, K, W extends Window> {
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
this.windowAssigner = windowAssigner;
- this.trigger = windowAssigner.getDefaultTrigger();
+ this.trigger = windowAssigner.getDefaultTrigger(input.getExecutionEnvironment());
}
/**
@@ -151,13 +152,15 @@ public class WindowedStream<T, K, W extends Window> {
OneInputStreamOperator<T, T> operator;
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
if (evictor != null) {
operator = new EvictingWindowOperator<>(windowAssigner,
keySel,
new HeapWindowBuffer.Factory<T>(),
new ReduceWindowFunction<K, W, T>(function),
trigger,
- evictor);
+ evictor).enableSetProcessingTime(setProcessingTime);
} else {
// we need to copy because we need our own instance of the pre aggregator
@@ -168,7 +171,7 @@ public class WindowedStream<T, K, W extends Window> {
keySel,
new PreAggregatingHeapWindowBuffer.Factory<>(functionCopy),
new ReduceWindowFunction<K, W, T>(function),
- trigger);
+ trigger).enableSetProcessingTime(setProcessingTime);
}
return input.transform(opName, input.getType(), operator);
@@ -222,7 +225,9 @@ public class WindowedStream<T, K, W extends Window> {
String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
KeySelector<T, K> keySel = input.getKeySelector();
- OneInputStreamOperator<T, R> operator;
+ WindowOperator<K, T, R, W> operator;
+
+ boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
if (evictor != null) {
operator = new EvictingWindowOperator<>(windowAssigner,
@@ -230,14 +235,14 @@ public class WindowedStream<T, K, W extends Window> {
new HeapWindowBuffer.Factory<T>(),
function,
trigger,
- evictor);
+ evictor).enableSetProcessingTime(setProcessingTime);
} else {
operator = new WindowOperator<>(windowAssigner,
keySel,
new HeapWindowBuffer.Factory<T>(),
function,
- trigger);
+ trigger).enableSetProcessingTime(setProcessingTime);;
}
return input.transform(opName, resultType, operator);
@@ -450,8 +455,8 @@ public class WindowedStream<T, K, W extends Window> {
TypeInformation<R> resultType,
String functionName) {
- if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- SlidingProcessingTimeWindows timeWindows = (SlidingProcessingTimeWindows) windowAssigner;
+ if (windowAssigner instanceof SlidingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ SlidingTimeWindows timeWindows = (SlidingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSlide();
@@ -475,8 +480,8 @@ public class WindowedStream<T, K, W extends Window> {
wf, input.getKeySelector(), windowLength, windowSlide);
return input.transform(opName, resultType, op);
}
- } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
- TumblingProcessingTimeWindows timeWindows = (TumblingProcessingTimeWindows) windowAssigner;
+ } else if (windowAssigner instanceof TumblingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) {
+ TumblingTimeWindows timeWindows = (TumblingTimeWindows) windowAssigner;
final long windowLength = timeWindows.getSize();
final long windowSlide = timeWindows.getSize();
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index c2e2880..cc96217 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -541,11 +541,23 @@ public abstract class StreamExecutionEnvironment {
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
+ *
+ * <p>
+ * If you set the characteristic to IngestionTime of EventTime this will set a default
+ * watermark update interval of 200 ms. If this is not applicable for your application
+ * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Objects.requireNonNull(characteristic);
+ if (characteristic == TimeCharacteristic.ProcessingTime) {
+ getConfig().disableTimestamps();
+ getConfig().setAutoWatermarkInterval(0);
+ } else {
+ getConfig().enableTimestamps();
+ getConfig().setAutoWatermarkInterval(200);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 52c8f55..dbeb5ce 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
@@ -42,7 +43,7 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
}
@Override
- public Trigger<Object, GlobalWindow> getDefaultTrigger() {
+ public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return null;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
deleted file mode 100644
index 65d7641..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing
- * is based on system time. Windows can possibly overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- * keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private final long size;
-
- private final long slide;
-
- private transient List<TimeWindow> result;
-
- private SlidingProcessingTimeWindows(long size, long slide) {
- this.size = size;
- this.slide = slide;
- this.result = Lists.newArrayListWithCapacity((int) (size / slide));
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- this.result = Lists.newArrayListWithCapacity((int) (size / slide));
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- result.clear();
- long time = System.currentTimeMillis();
- long lastStart = time - time % slide;
- for (long start = lastStart;
- start > time - size;
- start -= slide) {
- result.add(new TimeWindow(start, size));
- }
- return result;
- }
-
- public long getSize() {
- return size;
- }
-
- public long getSlide() {
- return slide;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger() {
- return ProcessingTimeTrigger.create();
- }
-
- @Override
- public String toString() {
- return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
- }
-
- /**
- * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
- * elements to sliding time windows based on the current processing time.
- *
- * @param size The size of the generated windows.
- * @param slide The slide interval of the generated windows.
- * @return The time policy.
- */
- public static SlidingProcessingTimeWindows of(AbstractTime size, AbstractTime slide) {
- return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 52ae356..6036dfb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -17,7 +17,10 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -72,8 +75,12 @@ public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
}
@Override
- public Trigger<Object, TimeWindow> getDefaultTrigger() {
- return WatermarkTrigger.create();
+ public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+ if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
+ return ProcessingTimeTrigger.create();
+ } else {
+ return WatermarkTrigger.create();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
deleted file mode 100644
index 41f6362..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.api.windowing.assigners;
-
-import org.apache.flink.streaming.api.windowing.time.AbstractTime;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * A {@link WindowAssigner} that windows elements into time-based windows. The windowing is
- * based on system time. Windows cannot overlap.
- *
- * <p>
- * For example, in order to window into windows of 1 minute, every 10 seconds:
- * <pre> {@code
- * DataStream<Tuple2<String, Integer>> in = ...;
- * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
- * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
- * keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
- * } </pre>
- */
-public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
- private static final long serialVersionUID = 1L;
-
- private long size;
-
- private TumblingProcessingTimeWindows(long size) {
- this.size = size;
- }
-
- @Override
- public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
- long time = System.currentTimeMillis();
- long start = time - (time % size);
- return Collections.singletonList(new TimeWindow(start, size));
- }
-
- public long getSize() {
- return size;
- }
-
- @Override
- public Trigger<Object, TimeWindow> getDefaultTrigger() {
- return ProcessingTimeTrigger.create();
- }
-
- @Override
- public String toString() {
- return "TumblingProcessingTimeWindows(" + size + ")";
- }
-
- /**
- * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
- * elements to time windows based on the current processing time.
- *
- * @param size The size of the generated windows.
- * @return The time policy.
- */
- public static TumblingProcessingTimeWindows of(AbstractTime size) {
- return new TumblingProcessingTimeWindows(size.toMilliseconds());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index b6022b3..d57dc33 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -17,7 +17,10 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.AbstractTime;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
@@ -58,8 +61,12 @@ public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
}
@Override
- public Trigger<Object, TimeWindow> getDefaultTrigger() {
- return WatermarkTrigger.create();
+ public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+ if (env.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
+ return ProcessingTimeTrigger.create();
+ } else {
+ return WatermarkTrigger.create();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 20fe365..d0b1ed0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -17,6 +17,7 @@
*/
package org.apache.flink.streaming.api.windowing.assigners;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import java.io.Serializable;
@@ -50,5 +51,5 @@ public abstract class WindowAssigner<T, W extends Window> implements Serializabl
/**
* Returns the default trigger associated with this {@code WindowAssigner}.
*/
- public abstract Trigger<T, W> getDefaultTrigger();
+ public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 548afb3..368b8fa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -347,6 +347,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
// ------------------------------------------------------------------------
@VisibleForTesting
+ public boolean isSetProcessingTime() {
+ return setProcessingTime;
+ }
+
+ @VisibleForTesting
public Trigger<? super IN, ? super W> getTriggerTemplate() {
return triggerTemplate;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
index c06a608..9ddd6eb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoGroupJoinITCase.java
@@ -49,8 +49,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
testResults = Lists.newArrayList();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
- env.getConfig().enableTimestamps();
DataStream<Tuple2<String, Integer>> source1 = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@@ -144,8 +144,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
testResults = Lists.newArrayList();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
- env.getConfig().enableTimestamps();
DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = 1L;
@@ -239,8 +239,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
testResults = Lists.newArrayList();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
- env.getConfig().enableTimestamps();
DataStream<Tuple3<String, String, Integer>> source1 = env.addSource(new SourceFunction<Tuple3<String, String, Integer>>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
index 09a7149..4fa16ac 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AllWindowTranslationTest.java
@@ -19,25 +19,25 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
import org.apache.flink.util.Collector;
import org.junit.Assert;
-import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
@@ -50,30 +50,33 @@ import java.util.concurrent.TimeUnit;
public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
/**
- * These tests ensure that the fast aligned time windows operator is used if the
- * conditions are right.
- *
- * TODO: update once fast aligned time windows operator is in
+ * These tests ensure that the correct trigger is set when using event-time windows.
*/
- @Ignore
@Test
- public void testFastTimeWindows() throws Exception {
+ @SuppressWarnings("rawtypes")
+ public void testEventTime() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
- Assert.assertTrue(operator1 instanceof AggregatingProcessingTimeWindowOperator);
+ Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
+ NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+ Assert.assertFalse(winOperator1.isSetProcessingTime());
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -88,20 +91,26 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
- Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
+ Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
+ NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+ Assert.assertFalse(winOperator2.isSetProcessingTime());
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@Test
@SuppressWarnings("rawtypes")
public void testNonEvicting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduce(reducer);
@@ -109,12 +118,13 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator1 = (NonKeyedWindowOperator) operator1;
+ Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -132,8 +142,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof NonKeyedWindowOperator);
NonKeyedWindowOperator winOperator2 = (NonKeyedWindowOperator) operator2;
+ Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -141,13 +152,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
@SuppressWarnings("rawtypes")
public void testEvicting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .windowAll(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduce(reducer);
@@ -155,13 +167,14 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator1 = (EvictingNonKeyedWindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertFalse(winOperator1.isSetProcessingTime());
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, TimeWindow>() {
@@ -180,8 +193,9 @@ public class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof EvictingNonKeyedWindowOperator);
EvictingNonKeyedWindowOperator winOperator2 = (EvictingNonKeyedWindowOperator) operator2;
+ Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
index 5124add..10fe734 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowTranslationTest.java
@@ -20,19 +20,20 @@ package org.apache.flink.streaming.runtime.operators.windowing;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
-import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.CountEvictor;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
-import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.WatermarkTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.HeapWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.PreAggregatingHeapWindowBuffer;
@@ -59,13 +60,13 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DummyReducer reducer = new DummyReducer();
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS),
- Time.of(100, TimeUnit.MILLISECONDS)))
+ .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer);
OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
@@ -74,7 +75,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -92,10 +93,63 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
Assert.assertTrue(operator2 instanceof AccumulatingProcessingTimeWindowOperator);
}
+ /**
+ * These tests ensure that the correct trigger is set when using event-time windows.
+ */
+ @Test
+ @SuppressWarnings("rawtypes")
+ public void testEventTime() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+ DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+
+ DummyReducer reducer = new DummyReducer();
+
+ DataStream<Tuple2<String, Integer>> window1 = source
+ .keyBy(0)
+ .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .reduce(reducer);
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform1 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window1.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
+ Assert.assertTrue(operator1 instanceof WindowOperator);
+ WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertFalse(winOperator1.isSetProcessingTime());
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
+
+ DataStream<Tuple2<String, Integer>> window2 = source
+ .keyBy(0)
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void apply(Tuple tuple,
+ TimeWindow window,
+ Iterable<Tuple2<String, Integer>> values,
+ Collector<Tuple2<String, Integer>> out) throws Exception {
+
+ }
+ });
+
+ OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>> transform2 = (OneInputTransformation<Tuple2<String, Integer>, Tuple2<String, Integer>>) window2.getTransformation();
+ OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
+ Assert.assertTrue(operator2 instanceof WindowOperator);
+ WindowOperator winOperator2 = (WindowOperator) operator2;
+ Assert.assertFalse(winOperator2.isSetProcessingTime());
+ Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
+ }
+
@Test
@SuppressWarnings("rawtypes")
public void testNonEvicting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -103,7 +157,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
.reduce(reducer);
@@ -111,13 +165,14 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof WindowOperator);
WindowOperator winOperator1 = (WindowOperator) operator1;
+ Assert.assertTrue(winOperator1.isSetProcessingTime());
Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof PreAggregatingHeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
private static final long serialVersionUID = 1L;
@@ -135,8 +190,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof WindowOperator);
WindowOperator winOperator2 = (WindowOperator) operator2;
+ Assert.assertTrue(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
@@ -144,6 +200,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
@SuppressWarnings("rawtypes")
public void testEvicting() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<Tuple2<String, Integer>> source = env.fromElements(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -151,7 +208,7 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
DataStream<Tuple2<String, Integer>> window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
+ .window(SlidingTimeWindows.of(Time.of(1, TimeUnit.SECONDS), Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(CountEvictor.of(100))
.reduce(reducer);
@@ -159,14 +216,15 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator1 = transform1.getOperator();
Assert.assertTrue(operator1 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator1 = (EvictingWindowOperator) operator1;
- Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof ProcessingTimeTrigger);
- Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingProcessingTimeWindows);
+ Assert.assertFalse(winOperator1.isSetProcessingTime());
+ Assert.assertTrue(winOperator1.getTriggerTemplate() instanceof WatermarkTrigger);
+ Assert.assertTrue(winOperator1.getWindowAssigner() instanceof SlidingTimeWindows);
Assert.assertTrue(winOperator1.getEvictor() instanceof CountEvictor);
Assert.assertTrue(winOperator1.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
DataStream<Tuple2<String, Integer>> window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(TimeEvictor.of(Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
@@ -185,8 +243,9 @@ public class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
OneInputStreamOperator<Tuple2<String, Integer>, Tuple2<String, Integer>> operator2 = transform2.getOperator();
Assert.assertTrue(operator2 instanceof EvictingWindowOperator);
EvictingWindowOperator winOperator2 = (EvictingWindowOperator) operator2;
+ Assert.assertFalse(winOperator2.isSetProcessingTime());
Assert.assertTrue(winOperator2.getTriggerTemplate() instanceof CountTrigger);
- Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingProcessingTimeWindows);
+ Assert.assertTrue(winOperator2.getWindowAssigner() instanceof TumblingTimeWindows);
Assert.assertTrue(winOperator2.getEvictor() instanceof TimeEvictor);
Assert.assertTrue(winOperator2.getWindowBufferFactory() instanceof HeapWindowBuffer.Factory);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
index 8abf9d6..5915a7a 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/WindowJoin.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -64,7 +65,7 @@ public class WindowJoin {
// obtain execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().enableTimestamps();
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// connect to the data sources for grades and salaries
Tuple2<DataStream<Tuple3<Long, String, Integer>>, DataStream<Tuple3<Long, String, Integer>>> input = getInputStreams(env);
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
index 225dab7..42484e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
+++ b/flink-staging/flink-streaming/flink-streaming-examples/src/main/scala/org/apache/flink/streaming/scala/examples/join/WindowJoin.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.scala.examples.join
import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
@@ -45,7 +46,7 @@ object WindowJoin {
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
- env.getConfig.enableTimestamps()
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//Create streams for grades and salaries by mapping the inputs to the corresponding objects
val grades = setGradesInput(env)
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 7babc40..fb4d75d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -34,7 +34,7 @@ import org.apache.flink.streaming.api.scala.function.StatefulFunction
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.policy.{EvictionPolicy, TriggerPolicy}
-import org.apache.flink.streaming.api.windowing.time.{AbstractTime, EventTime, ProcessingTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow, Window}
import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
@@ -624,20 +624,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @param size The size of the window.
*/
def timeWindowAll(size: AbstractTime): AllWindowedStream[T, TimeWindow] = {
- val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
- val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
- actualSize match {
- case t: EventTime =>
- val assigner = TumblingTimeWindows.of(actualSize)
- .asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- case t: ProcessingTime =>
- val assigner = TumblingProcessingTimeWindows.of(actualSize)
- .asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- case _ => throw new RuntimeException("Invalid time: " + actualSize)
- }
+ val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ windowAll(assigner)
}
/**
@@ -651,23 +639,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* @param size The size of the window.
*/
def timeWindowAll(size: AbstractTime, slide: AbstractTime): AllWindowedStream[T, TimeWindow] = {
- val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
- val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
- val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
- actualSize match {
- case t: EventTime =>
- val assigner = SlidingTimeWindows.of(
- actualSize,
- actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- case t: ProcessingTime =>
- val assigner = SlidingProcessingTimeWindows.of(
- actualSize,
- actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- windowAll(assigner)
- case _ => throw new RuntimeException("Invalid time: " + actualSize)
- }
+ val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ windowAll(assigner)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 0ce36aa..c605bb1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.functions.aggregation.SumAggregator
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator
import org.apache.flink.streaming.api.operators.StreamGroupedReduce
import org.apache.flink.streaming.api.windowing.assigners._
-import org.apache.flink.streaming.api.windowing.time.{ProcessingTime, EventTime, AbstractTime}
+import org.apache.flink.streaming.api.windowing.time.AbstractTime
import org.apache.flink.streaming.api.windowing.windows.{Window, TimeWindow}
import scala.reflect.ClassTag
import org.apache.flink.api.common.typeinfo.TypeInformation
@@ -50,20 +50,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* @param size The size of the window.
*/
def timeWindow(size: AbstractTime): WindowedStream[T, K, TimeWindow] = {
- val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
- val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
- actualSize match {
- case t: EventTime =>
- val assigner = TumblingTimeWindows.of(actualSize)
- .asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- case t: ProcessingTime =>
- val assigner = TumblingProcessingTimeWindows.of(actualSize)
- .asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- case _ => throw new RuntimeException("Invalid time: " + actualSize)
- }
+ val assigner = TumblingTimeWindows.of(size).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ window(assigner)
}
/**
@@ -78,23 +66,8 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
* @param size The size of the window.
*/
def timeWindow(size: AbstractTime, slide: AbstractTime): WindowedStream[T, K, TimeWindow] = {
- val env = new StreamExecutionEnvironment(javaStream.getExecutionEnvironment)
- val actualSize = size.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
- val actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(env.getStreamTimeCharacteristic)
-
- actualSize match {
- case t: EventTime =>
- val assigner = SlidingTimeWindows.of(
- actualSize,
- actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- case t: ProcessingTime =>
- val assigner = SlidingProcessingTimeWindows.of(
- actualSize,
- actualSlide).asInstanceOf[WindowAssigner[T, TimeWindow]]
- window(assigner)
- case _ => throw new RuntimeException("Invalid time: " + actualSize)
- }
+ val assigner = SlidingTimeWindows.of(size, slide).asInstanceOf[WindowAssigner[T, TimeWindow]]
+ window(assigner)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 7492e48..f767aba 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -299,6 +299,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
+ * If you set the characteristic to IngestionTime of EventTime this will set a default
+ * watermark update interval of 200 ms. If this is not applicable for your application
+ * you should change it using
+ * [[org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)]]
+ *
* @param characteristic The time characteristic.
*/
def setStreamTimeCharacteristic(characteristic: TimeCharacteristic) : Unit = {
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
index dece9f6..99fcd07 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/AllWindowTranslationTest.scala
@@ -22,9 +22,10 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
import org.apache.flink.api.common.functions.RichReduceFunction
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -49,13 +50,14 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
@Test
def testFastTimeWindows(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val source = env.fromElements(("hello", 1), ("hello", 2))
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(
+ .windowAll(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer)
@@ -69,7 +71,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .windowAll(SlidingProcessingTimeWindows.of(
+ .windowAll(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -96,7 +98,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(
+ .windowAll(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
@@ -110,13 +112,13 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
val winOperator1 = operator1.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
val window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
def apply(
@@ -133,7 +135,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[NonKeyedWindowOperator[_, _, _]])
val winOperator2 = operator2.asInstanceOf[NonKeyedWindowOperator[_, _, _]]
assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
@@ -146,7 +148,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val reducer = new DummyReducer
val window1 = source
- .windowAll(SlidingProcessingTimeWindows.of(
+ .windowAll(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
@@ -161,12 +163,12 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator1 = operator1.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
val window2 = source
- .windowAll(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .windowAll(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new AllWindowFunction[(String, Int), (String, Int), TimeWindow]() {
@@ -185,7 +187,7 @@ class AllWindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator2 = operator2.asInstanceOf[EvictingNonKeyedWindowOperator[_, _, _]]
assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
index 7232309..3f6e10f 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupJoinITCase.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
import java.util.concurrent.TimeUnit
+import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.TimestampExtractor
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -38,9 +39,9 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
- env.getConfig.enableTimestamps
-
+
val source1 = env.addSource(new SourceFunction[(String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, Int)]) {
ctx.collect(("a", 0))
@@ -101,8 +102,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
- env.getConfig.enableTimestamps
val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {
@@ -177,8 +178,8 @@ class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
CoGroupJoinITCase.testResults = mutable.MutableList()
val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
- env.getConfig.enableTimestamps
val source1 = env.addSource(new SourceFunction[(String, String, Int)]() {
def run(ctx: SourceFunction.SourceContext[(String, String, Int)]) {