You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:51 UTC
[2/8] flink git commit: [FLINK-2550] Rework interplay of Window
Assigners and TimeCharacteristic
http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index fa9c0a9..65f978c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
import org.apache.flink.api.java.tuple.Tuple
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -52,7 +52,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(
+ .window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.reduce(reducer)
@@ -66,7 +66,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window2 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(
+ .window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
@@ -95,7 +95,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(
+ .window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.trigger(CountTrigger.of(100))
@@ -109,14 +109,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(
winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
val window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
def apply(
@@ -134,7 +134,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
@@ -148,7 +148,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val window1 = source
.keyBy(0)
- .window(SlidingProcessingTimeWindows.of(
+ .window(SlidingTimeWindows.of(
Time.of(1, TimeUnit.SECONDS),
Time.of(100, TimeUnit.MILLISECONDS)))
.evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
@@ -163,13 +163,13 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
- assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+ assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
val window2 = source
.keyBy(0)
- .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+ .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
.trigger(CountTrigger.of(100))
.evictor(CountEvictor.of(1000))
.apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
@@ -189,7 +189,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
- assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+ assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
}
}