You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/09/20 13:30:51 UTC
[flink] branch master updated: [FLINK-10050][DataStream API]
Support allowedLateness in CoGroupedStreams.
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a8964ac [FLINK-10050][DataStream API] Support allowedLateness in CoGroupedStreams.
a8964ac is described below
commit a8964acb54e755761054a0efa73a7a7ddd6a53d9
Author: Eugen Yushin <eu...@gojuno.com>
AuthorDate: Sun Sep 2 10:35:43 2018 +0200
[FLINK-10050][DataStream API] Support allowedLateness in CoGroupedStreams.
This closes #6646.
---
.../streaming/api/datastream/CoGroupedStreams.java | 48 ++++++++++---
.../streaming/api/datastream/JoinedStreams.java | 69 +++++++++++++-----
.../streaming/api/datastream/WindowedStream.java | 8 +++
.../api/datastream/CoGroupedStreamsTest.java | 82 ++++++++++++++++++++++
.../api/datastream/JoinedStreamsTest.java | 81 +++++++++++++++++++++
.../streaming/api/scala/CoGroupedStreams.scala | 20 ++++--
.../flink/streaming/api/scala/JoinedStreams.scala | 21 ++++--
.../streaming/api/scala/CoGroupedStreamsTest.scala | 47 +++++++++++++
.../streaming/api/scala/JoinedStreamsTest.scala | 46 ++++++++++++
9 files changed, 388 insertions(+), 34 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 55009e1..0b22f3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
@@ -40,6 +41,7 @@ import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -183,7 +185,7 @@ public class CoGroupedStreams<T1, T2> {
*/
@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
}
@@ -215,6 +217,10 @@ public class CoGroupedStreams<T1, T2> {
private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+ private final Time allowedLateness;
+
+ private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
+
protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
KeySelector<T1, KEY> keySelector1,
@@ -222,7 +228,8 @@ public class CoGroupedStreams<T1, T2> {
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
- Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+ Time allowedLateness) {
this.input1 = input1;
this.input2 = input2;
@@ -233,6 +240,8 @@ public class CoGroupedStreams<T1, T2> {
this.windowAssigner = windowAssigner;
this.trigger = trigger;
this.evictor = evictor;
+
+ this.allowedLateness = allowedLateness;
}
/**
@@ -241,7 +250,7 @@ public class CoGroupedStreams<T1, T2> {
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, newTrigger, evictor);
+ windowAssigner, newTrigger, evictor, allowedLateness);
}
/**
@@ -254,7 +263,17 @@ public class CoGroupedStreams<T1, T2> {
@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, trigger, newEvictor);
+ windowAssigner, trigger, newEvictor, allowedLateness);
+ }
+
+ /**
+ * Sets the time by which elements are allowed to be late.
+ * @see WindowedStream#allowedLateness(Time)
+ */
+ @PublicEvolving
+ public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, evictor, newLateness);
}
/**
@@ -321,18 +340,21 @@ public class CoGroupedStreams<T1, T2> {
DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
// we explicitly create the keyed stream to manually pass the key type information in
- WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
+ windowedStream =
new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
.window(windowAssigner);
if (trigger != null) {
- windowOp.trigger(trigger);
+ windowedStream.trigger(trigger);
}
if (evictor != null) {
- windowOp.evictor(evictor);
+ windowedStream.evictor(evictor);
+ }
+ if (allowedLateness != null) {
+ windowedStream.allowedLateness(allowedLateness);
}
- return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+ return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}
/**
@@ -351,6 +373,16 @@ public class CoGroupedStreams<T1, T2> {
public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}
+
+ @VisibleForTesting
+ Time getAllowedLateness() {
+ return allowedLateness;
+ }
+
+ @VisibleForTesting
+ WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
+ return windowedStream;
+ }
}
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index bb67c09..4c327bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.JoinFunction;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
@@ -166,7 +168,7 @@ public class JoinedStreams<T1, T2> {
*/
@PublicEvolving
public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
- return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
}
@@ -198,6 +200,10 @@ public class JoinedStreams<T1, T2> {
private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
+ private final Time allowedLateness;
+
+ private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;
+
@PublicEvolving
protected WithWindow(DataStream<T1> input1,
DataStream<T2> input2,
@@ -206,7 +212,8 @@ public class JoinedStreams<T1, T2> {
TypeInformation<KEY> keyType,
WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
- Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+ Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+ Time allowedLateness) {
this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);
@@ -219,6 +226,8 @@ public class JoinedStreams<T1, T2> {
this.trigger = trigger;
this.evictor = evictor;
+
+ this.allowedLateness = allowedLateness;
}
/**
@@ -227,7 +236,7 @@ public class JoinedStreams<T1, T2> {
@PublicEvolving
public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, newTrigger, evictor);
+ windowAssigner, newTrigger, evictor, allowedLateness);
}
/**
@@ -239,7 +248,17 @@ public class JoinedStreams<T1, T2> {
@PublicEvolving
public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
- windowAssigner, trigger, newEvictor);
+ windowAssigner, trigger, newEvictor, allowedLateness);
+ }
+
+ /**
+ * Sets the time by which elements are allowed to be late.
+ * @see WindowedStream#allowedLateness(Time)
+ */
+ @PublicEvolving
+ public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+ return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+ windowAssigner, trigger, evictor, newLateness);
}
/**
@@ -295,14 +314,16 @@ public class JoinedStreams<T1, T2> {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
- return input1.coGroup(input2)
- .where(keySelector1)
- .equalTo(keySelector2)
- .window(windowAssigner)
- .trigger(trigger)
- .evictor(evictor)
- .apply(new FlatJoinCoGroupFunction<>(function), resultType);
+ coGroupedWindowedStream = input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .allowedLateness(allowedLateness);
+ return coGroupedWindowedStream
+ .apply(new FlatJoinCoGroupFunction<>(function), resultType);
}
@@ -376,14 +397,16 @@ public class JoinedStreams<T1, T2> {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
- return input1.coGroup(input2)
- .where(keySelector1)
- .equalTo(keySelector2)
- .window(windowAssigner)
- .trigger(trigger)
- .evictor(evictor)
- .apply(new JoinCoGroupFunction<>(function), resultType);
+ coGroupedWindowedStream = input1.coGroup(input2)
+ .where(keySelector1)
+ .equalTo(keySelector2)
+ .window(windowAssigner)
+ .trigger(trigger)
+ .evictor(evictor)
+ .allowedLateness(allowedLateness);
+ return coGroupedWindowedStream
+ .apply(new JoinCoGroupFunction<>(function), resultType);
}
/**
@@ -402,6 +425,16 @@ public class JoinedStreams<T1, T2> {
public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
return (SingleOutputStreamOperator<T>) apply(function, resultType);
}
+
+ @VisibleForTesting
+ Time getAllowedLateness() {
+ return allowedLateness;
+ }
+
+ @VisibleForTesting
+ CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
+ return coGroupedWindowedStream;
+ }
}
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1f09b73..871d86f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.datastream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.Public;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.Function;
@@ -1539,4 +1540,11 @@ public class WindowedStream<T, K, W extends Window> {
public TypeInformation<T> getInputType() {
return input.getType();
}
+
+ // -------------------- Testing Methods --------------------
+
+ @VisibleForTesting
+ long getAllowedLateness() {
+ return allowedLateness;
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
new file mode 100644
index 0000000..9225a36
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link CoGroupedStreams}.
+ */
+public class CoGroupedStreamsTest {
+ private DataStream<String> dataStream1;
+ private DataStream<String> dataStream2;
+ private KeySelector<String, String> keySelector;
+ private TumblingEventTimeWindows tsAssigner;
+ private CoGroupFunction<String, String, String> coGroupFunction;
+
+ @Before
+ public void setUp() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ dataStream1 = env.fromElements("a1", "a2", "a3");
+ dataStream2 = env.fromElements("a1", "a2");
+ keySelector = element -> element;
+ tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1L));
+ coGroupFunction = (CoGroupFunction<String, String, String>) (first, second, out) -> out.collect("");
+ }
+
+ @Test
+ public void testDelegateToCoGrouped() {
+ Time lateness = Time.milliseconds(42L);
+
+ CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+ .coGroup(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness);
+
+ withLateness.apply(coGroupFunction, BasicTypeInfo.STRING_TYPE_INFO);
+
+ Assert.assertEquals(lateness.toMilliseconds(), withLateness.getWindowedStream().getAllowedLateness());
+ }
+
+ @Test
+ public void testSetAllowedLateness() {
+ Time lateness = Time.milliseconds(42L);
+
+ CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+ .coGroup(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness);
+
+ Assert.assertEquals(lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds());
+ }
+
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
new file mode 100644
index 0000000..3284c20
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link JoinedStreams}.
+ */
+public class JoinedStreamsTest {
+ private DataStream<String> dataStream1;
+ private DataStream<String> dataStream2;
+ private KeySelector<String, String> keySelector;
+ private TumblingEventTimeWindows tsAssigner;
+ private JoinFunction<String, String, String> joinFunction;
+
+ @Before
+ public void setUp() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ dataStream1 = env.fromElements("a1", "a2", "a3");
+ dataStream2 = env.fromElements("a1", "a2");
+ keySelector = element -> element;
+ tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1));
+ joinFunction = (first, second) -> first + second;
+ }
+
+ @Test
+ public void testDelegateToCoGrouped() {
+ Time lateness = Time.milliseconds(42L);
+
+ JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+ .join(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness);
+
+ withLateness.apply(joinFunction, BasicTypeInfo.STRING_TYPE_INFO);
+
+ Assert.assertEquals(lateness.toMilliseconds(), withLateness.getCoGroupedWindowedStream().getAllowedLateness().toMilliseconds());
+ }
+
+ @Test
+ public void testSetAllowedLateness() {
+ Time lateness = Time.milliseconds(42L);
+
+ JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+ .join(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness);
+
+ Assert.assertEquals(lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds());
+ }
+}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 101d358..f69f642 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -112,7 +113,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
throw new UnsupportedOperationException(
"You first need to specify KeySelectors for both inputs using where() and equalTo().")
}
- new WithWindow[W](clean(assigner), null, null)
+ new WithWindow[W](clean(assigner), null, null, null)
}
/**
@@ -125,7 +126,8 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
class WithWindow[W <: Window](
windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
- evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+ evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+ val allowedLateness: Time) {
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
@@ -133,7 +135,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
@PublicEvolving
def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
- new WithWindow[W](windowAssigner, newTrigger, evictor)
+ new WithWindow[W](windowAssigner, newTrigger, evictor, allowedLateness)
}
/**
@@ -147,7 +149,16 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
def evictor(
newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
- new WithWindow[W](windowAssigner, trigger, newEvictor)
+ new WithWindow[W](windowAssigner, trigger, newEvictor, allowedLateness)
+ }
+
+ /**
+ * Sets the time by which elements are allowed to be late.
+ * Delegates to [[WindowedStream#allowedLateness(Time)]]
+ */
+ @PublicEvolving
+ def allowedLateness(newLateness: Time): WithWindow[W] = {
+ new WithWindow[W](windowAssigner, trigger, evictor, newLateness)
}
/**
@@ -202,6 +213,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
+ .allowedLateness(allowedLateness)
.apply(clean(function), implicitly[TypeInformation[T]]))
}
}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 93b5cc8..3898086 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -110,7 +111,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
"You first need to specify KeySelectors for both inputs using where() and equalTo().")
}
- new WithWindow[W](clean(assigner), null, null)
+ new WithWindow[W](clean(assigner), null, null, null)
}
/**
@@ -122,7 +123,8 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
class WithWindow[W <: Window](
windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
- evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+ evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+ val allowedLateness: Time) {
/**
* Sets the [[Trigger]] that should be used to trigger window emission.
@@ -130,7 +132,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
@PublicEvolving
def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
- new WithWindow[W](windowAssigner, newTrigger, evictor)
+ new WithWindow[W](windowAssigner, newTrigger, evictor, allowedLateness)
}
/**
@@ -142,7 +144,16 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
@PublicEvolving
def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
: WithWindow[W] = {
- new WithWindow[W](windowAssigner, trigger, newEvictor)
+ new WithWindow[W](windowAssigner, trigger, newEvictor, allowedLateness)
+ }
+
+ /**
+ * Sets the time by which elements are allowed to be late.
+ * Delegates to [[WindowedStream#allowedLateness(Time)]]
+ */
+ @PublicEvolving
+ def allowedLateness(newLateness: Time): WithWindow[W] = {
+ new WithWindow[W](windowAssigner, trigger, evictor, newLateness)
}
/**
@@ -191,6 +202,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
+ .allowedLateness(allowedLateness)
.apply(clean(function), implicitly[TypeInformation[T]]))
}
@@ -208,6 +220,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
.window(windowAssigner)
.trigger(trigger)
.evictor(evictor)
+ .allowedLateness(allowedLateness)
.apply(clean(function), implicitly[TypeInformation[T]]))
}
}
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala
new file mode 100644
index 0000000..0ef5b88
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.{Assert, Test}
+
+/**
+ * Unit test for [[org.apache.flink.streaming.api.scala.CoGroupedStreams]]
+ */
+class CoGroupedStreamsTest {
+ private val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ private val dataStream1 = env.fromElements("a1", "a2", "a3")
+ private val dataStream2 = env.fromElements("a1", "a2")
+ private val keySelector = (s: String) => s
+ private val tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1))
+
+ @Test
+ def testSetAllowedLateness(): Unit = {
+ val lateness = Time.milliseconds(42)
+ val withLateness = dataStream1.coGroup(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness)
+ Assert.assertEquals(lateness.toMilliseconds, withLateness.allowedLateness.toMilliseconds)
+ }
+
+}
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala
new file mode 100644
index 0000000..451882e
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.{Assert, Test}
+
+/**
+ * Unit test for [[org.apache.flink.streaming.api.scala.JoinedStreams]]
+ */
+class JoinedStreamsTest {
+ private val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+ private val dataStream1 = env.fromElements("a1", "a2", "a3")
+ private val dataStream2 = env.fromElements("a1", "a2")
+ private val keySelector = (s: String) => s
+ private val tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1))
+
+ @Test
+ def testSetAllowedLateness(): Unit = {
+ val lateness = Time.milliseconds(42)
+ val withLateness = dataStream1.join(dataStream2)
+ .where(keySelector)
+ .equalTo(keySelector)
+ .window(tsAssigner)
+ .allowedLateness(lateness)
+ Assert.assertEquals(lateness.toMilliseconds, withLateness.allowedLateness.toMilliseconds)
+ }
+}