You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:30 UTC

[2/6] flink git commit: [FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)

[FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06fb9f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06fb9f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06fb9f1b

Branch: refs/heads/master
Commit: 06fb9f1b4f97ade67a23cd3adc8212e7d848de48
Parents: f0ef370
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:58:29 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/DataStreamTest.java     | 50 ++++++++++++++++++++
 .../streaming/api/scala/DataStreamTest.scala    | 28 ++++++++++-
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 17bea68..5e43120 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -544,6 +546,45 @@ public class DataStreamTest {
 		assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
 	}
 
+	/**
+	 * Verify that a timely flat map call is correctly translated to an operator.
+	 */
+	@Test
+	public void testTimelyFlatMapTranslation() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+		TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(
+					Long value,
+					TimerService timerService,
+					Collector<Integer> out) throws Exception {
+
+			}
+
+			@Override
+			public void onTimer(
+					long timestamp,
+					TimeDomain timeDomain,
+					TimerService timerService,
+					Collector<Integer> out) throws Exception {
+
+			}
+		};
+
+		DataStream<Integer> flatMapped = src
+				.keyBy(new IdentityKeySelector<Long>())
+				.flatMap(timelyFlatMapFunction);
+
+		flatMapped.addSink(new DiscardingSink<Integer>());
+
+		assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
+		assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+	}
+
 	@Test
 	public void operatorTest() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -809,6 +850,15 @@ public class DataStreamTest {
 		}
 	}
 
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
 	public static class CustomPOJO {
 		private String s;
 		private int i;

http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b73eae8..de8b388 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,10 +22,12 @@ import java.lang
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.{TimeDomain, TimerService}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -315,6 +317,30 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // TODO check for custom case class
   }
 
+  /**
+   * Verify that a timely flat map call is correctly translated to an operator.
+   */
+  @Test
+  def testTimelyFlatMapTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
+      override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ???
+      override def onTimer(
+          timestamp: Long,
+          timeDomain: TimeDomain,
+          timerService: TimerService,
+          out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+
+    assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
+  }
+
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment