You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 22:33:51 UTC

[2/8] flink git commit: [FLINK-2550] Rework interplay of Window Assigners and TimeCharacteristic

http://git-wip-us.apache.org/repos/asf/flink/blob/ff367d6e/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
index fa9c0a9..65f978c 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowTranslationTest.scala
@@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction
 import org.apache.flink.streaming.api.transformations.OneInputTransformation
-import org.apache.flink.streaming.api.windowing.assigners.{TumblingProcessingTimeWindows, SlidingProcessingTimeWindows}
+import org.apache.flink.streaming.api.windowing.assigners.{TumblingTimeWindows, SlidingTimeWindows}
 import org.apache.flink.streaming.api.windowing.evictors.{CountEvictor, TimeEvictor}
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.{ProcessingTimeTrigger, CountTrigger}
@@ -52,7 +52,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(
+      .window(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .reduce(reducer)
@@ -66,7 +66,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window2 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(
+      .window(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
@@ -95,7 +95,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(
+      .window(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .trigger(CountTrigger.of(100))
@@ -109,14 +109,14 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
     assertTrue(operator1.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator1 = operator1.asInstanceOf[WindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(
       winOperator1.getWindowBufferFactory.isInstanceOf[PreAggregatingHeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
       def apply(
@@ -134,7 +134,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
     assertTrue(operator2.isInstanceOf[WindowOperator[_, _, _, _]])
     val winOperator2 = operator2.asInstanceOf[WindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 
@@ -148,7 +148,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
 
     val window1 = source
       .keyBy(0)
-      .window(SlidingProcessingTimeWindows.of(
+      .window(SlidingTimeWindows.of(
         Time.of(1, TimeUnit.SECONDS),
         Time.of(100, TimeUnit.MILLISECONDS)))
       .evictor(TimeEvictor.of(Time.of(1, TimeUnit.SECONDS)))
@@ -163,13 +163,13 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val winOperator1 = operator1.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
     assertTrue(winOperator1.getTriggerTemplate.isInstanceOf[ProcessingTimeTrigger])
     assertTrue(winOperator1.getEvictor.isInstanceOf[TimeEvictor[_]])
-    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingProcessingTimeWindows])
+    assertTrue(winOperator1.getWindowAssigner.isInstanceOf[SlidingTimeWindows])
     assertTrue(winOperator1.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
 
 
     val window2 = source
       .keyBy(0)
-      .window(TumblingProcessingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
+      .window(TumblingTimeWindows.of(Time.of(1, TimeUnit.SECONDS)))
       .trigger(CountTrigger.of(100))
       .evictor(CountEvictor.of(1000))
       .apply(new WindowFunction[(String, Int), (String, Int), Tuple, TimeWindow]() {
@@ -189,7 +189,7 @@ class WindowTranslationTest extends StreamingMultipleProgramsTestBase {
     val winOperator2 = operator2.asInstanceOf[EvictingWindowOperator[_, _, _, _]]
     assertTrue(winOperator2.getTriggerTemplate.isInstanceOf[CountTrigger[_]])
     assertTrue(winOperator2.getEvictor.isInstanceOf[CountEvictor[_]])
-    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingProcessingTimeWindows])
+    assertTrue(winOperator2.getWindowAssigner.isInstanceOf[TumblingTimeWindows])
     assertTrue(winOperator2.getWindowBufferFactory.isInstanceOf[HeapWindowBuffer.Factory[_]])
   }
 }