You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2018/09/20 13:30:51 UTC

[flink] branch master updated: [FLINK-10050][DataStream API] Support allowedLateness in CoGroupedStreams.

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a8964ac  [FLINK-10050][DataStream API] Support allowedLateness in CoGroupedStreams.
a8964ac is described below

commit a8964acb54e755761054a0efa73a7a7ddd6a53d9
Author: Eugen Yushin <eu...@gojuno.com>
AuthorDate: Sun Sep 2 10:35:43 2018 +0200

    [FLINK-10050][DataStream API] Support allowedLateness in CoGroupedStreams.
    
    This closes #6646.
---
 .../streaming/api/datastream/CoGroupedStreams.java | 48 ++++++++++---
 .../streaming/api/datastream/JoinedStreams.java    | 69 +++++++++++++-----
 .../streaming/api/datastream/WindowedStream.java   |  8 +++
 .../api/datastream/CoGroupedStreamsTest.java       | 82 ++++++++++++++++++++++
 .../api/datastream/JoinedStreamsTest.java          | 81 +++++++++++++++++++++
 .../streaming/api/scala/CoGroupedStreams.scala     | 20 ++++--
 .../flink/streaming/api/scala/JoinedStreams.scala  | 21 ++++--
 .../streaming/api/scala/CoGroupedStreamsTest.scala | 47 +++++++++++++
 .../streaming/api/scala/JoinedStreamsTest.scala    | 46 ++++++++++++
 9 files changed, 388 insertions(+), 34 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
index 55009e1..0b22f3a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/CoGroupedStreams.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -40,6 +41,7 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -183,7 +185,7 @@ public class CoGroupedStreams<T1, T2> {
 			 */
 			@PublicEvolving
 			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
 			}
 		}
 	}
@@ -215,6 +217,10 @@ public class CoGroupedStreams<T1, T2> {
 
 		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
 
+		private final Time allowedLateness;
+
+		private WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowedStream;
+
 		protected WithWindow(DataStream<T1> input1,
 				DataStream<T2> input2,
 				KeySelector<T1, KEY> keySelector1,
@@ -222,7 +228,8 @@ public class CoGroupedStreams<T1, T2> {
 				TypeInformation<KEY> keyType,
 				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
 				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
-				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+				Time allowedLateness) {
 			this.input1 = input1;
 			this.input2 = input2;
 
@@ -233,6 +240,8 @@ public class CoGroupedStreams<T1, T2> {
 			this.windowAssigner = windowAssigner;
 			this.trigger = trigger;
 			this.evictor = evictor;
+
+			this.allowedLateness = allowedLateness;
 		}
 
 		/**
@@ -241,7 +250,7 @@ public class CoGroupedStreams<T1, T2> {
 		@PublicEvolving
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, newTrigger, evictor);
+					windowAssigner, newTrigger, evictor, allowedLateness);
 		}
 
 		/**
@@ -254,7 +263,17 @@ public class CoGroupedStreams<T1, T2> {
 		@PublicEvolving
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, trigger, newEvictor);
+					windowAssigner, trigger, newEvictor, allowedLateness);
+		}
+
+		/**
+		 * Sets the time by which elements are allowed to be late.
+		 * @see WindowedStream#allowedLateness(Time)
+		 */
+		@PublicEvolving
+		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+					windowAssigner, trigger, evictor, newLateness);
 		}
 
 		/**
@@ -321,18 +340,21 @@ public class CoGroupedStreams<T1, T2> {
 			DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
 
 			// we explicitly create the keyed stream to manually pass the key type information in
-			WindowedStream<TaggedUnion<T1, T2>, KEY, W> windowOp =
+			windowedStream =
 					new KeyedStream<TaggedUnion<T1, T2>, KEY>(unionStream, unionKeySelector, keyType)
 					.window(windowAssigner);
 
 			if (trigger != null) {
-				windowOp.trigger(trigger);
+				windowedStream.trigger(trigger);
 			}
 			if (evictor != null) {
-				windowOp.evictor(evictor);
+				windowedStream.evictor(evictor);
+			}
+			if (allowedLateness != null) {
+				windowedStream.allowedLateness(allowedLateness);
 			}
 
-			return windowOp.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
+			return windowedStream.apply(new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
 		}
 
 		/**
@@ -351,6 +373,16 @@ public class CoGroupedStreams<T1, T2> {
 		public <T> SingleOutputStreamOperator<T> with(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
 			return (SingleOutputStreamOperator<T>) apply(function, resultType);
 		}
+
+		@VisibleForTesting
+		Time getAllowedLateness() {
+			return allowedLateness;
+		}
+
+		@VisibleForTesting
+		WindowedStream<TaggedUnion<T1, T2>, KEY, W> getWindowedStream() {
+			return windowedStream;
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
index bb67c09..4c327bf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/JoinedStreams.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.datastream;
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -29,6 +30,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.CoGroupedStreams.TaggedUnion;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.util.Collector;
@@ -166,7 +168,7 @@ public class JoinedStreams<T1, T2> {
 			 */
 			@PublicEvolving
 			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
-				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null);
+				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
 			}
 		}
 	}
@@ -198,6 +200,10 @@ public class JoinedStreams<T1, T2> {
 
 		private final Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor;
 
+		private final Time allowedLateness;
+
+		private CoGroupedStreams.WithWindow<T1, T2, KEY, W> coGroupedWindowedStream;
+
 		@PublicEvolving
 		protected WithWindow(DataStream<T1> input1,
 				DataStream<T2> input2,
@@ -206,7 +212,8 @@ public class JoinedStreams<T1, T2> {
 				TypeInformation<KEY> keyType,
 				WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
 				Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
-				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor) {
+				Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
+				Time allowedLateness) {
 
 			this.input1 = requireNonNull(input1);
 			this.input2 = requireNonNull(input2);
@@ -219,6 +226,8 @@ public class JoinedStreams<T1, T2> {
 
 			this.trigger = trigger;
 			this.evictor = evictor;
+
+			this.allowedLateness = allowedLateness;
 		}
 
 		/**
@@ -227,7 +236,7 @@ public class JoinedStreams<T1, T2> {
 		@PublicEvolving
 		public WithWindow<T1, T2, KEY, W> trigger(Trigger<? super TaggedUnion<T1, T2>, ? super W> newTrigger) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, newTrigger, evictor);
+					windowAssigner, newTrigger, evictor, allowedLateness);
 		}
 
 		/**
@@ -239,7 +248,17 @@ public class JoinedStreams<T1, T2> {
 		@PublicEvolving
 		public WithWindow<T1, T2, KEY, W> evictor(Evictor<? super TaggedUnion<T1, T2>, ? super W> newEvictor) {
 			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
-					windowAssigner, trigger, newEvictor);
+					windowAssigner, trigger, newEvictor, allowedLateness);
+		}
+
+		/**
+		 * Sets the time by which elements are allowed to be late.
+		 * @see WindowedStream#allowedLateness(Time)
+		 */
+		@PublicEvolving
+		public WithWindow<T1, T2, KEY, W> allowedLateness(Time newLateness) {
+			return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType,
+				windowAssigner, trigger, evictor, newLateness);
 		}
 
 		/**
@@ -295,14 +314,16 @@ public class JoinedStreams<T1, T2> {
 			//clean the closure
 			function = input1.getExecutionEnvironment().clean(function);
 
-			return input1.coGroup(input2)
-					.where(keySelector1)
-					.equalTo(keySelector2)
-					.window(windowAssigner)
-					.trigger(trigger)
-					.evictor(evictor)
-					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
+			coGroupedWindowedStream = input1.coGroup(input2)
+				.where(keySelector1)
+				.equalTo(keySelector2)
+				.window(windowAssigner)
+				.trigger(trigger)
+				.evictor(evictor)
+				.allowedLateness(allowedLateness);
 
+			return coGroupedWindowedStream
+					.apply(new FlatJoinCoGroupFunction<>(function), resultType);
 		}
 
 
@@ -376,14 +397,16 @@ public class JoinedStreams<T1, T2> {
 			//clean the closure
 			function = input1.getExecutionEnvironment().clean(function);
 
-			return input1.coGroup(input2)
-					.where(keySelector1)
-					.equalTo(keySelector2)
-					.window(windowAssigner)
-					.trigger(trigger)
-					.evictor(evictor)
-					.apply(new JoinCoGroupFunction<>(function), resultType);
+			coGroupedWindowedStream = input1.coGroup(input2)
+				.where(keySelector1)
+				.equalTo(keySelector2)
+				.window(windowAssigner)
+				.trigger(trigger)
+				.evictor(evictor)
+				.allowedLateness(allowedLateness);
 
+			return coGroupedWindowedStream
+					.apply(new JoinCoGroupFunction<>(function), resultType);
 		}
 
 		/**
@@ -402,6 +425,16 @@ public class JoinedStreams<T1, T2> {
 		public <T> SingleOutputStreamOperator<T> with(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) {
 			return (SingleOutputStreamOperator<T>) apply(function, resultType);
 		}
+
+		@VisibleForTesting
+		Time getAllowedLateness() {
+			return allowedLateness;
+		}
+
+		@VisibleForTesting
+		CoGroupedStreams.WithWindow<T1, T2, KEY, W> getCoGroupedWindowedStream() {
+			return coGroupedWindowedStream;
+		}
 	}
 
 	// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 1f09b73..871d86f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
@@ -1539,4 +1540,11 @@ public class WindowedStream<T, K, W extends Window> {
 	public TypeInformation<T> getInputType() {
 		return input.getType();
 	}
+
+	// -------------------- Testing Methods --------------------
+
+	@VisibleForTesting
+	long getAllowedLateness() {
+		return allowedLateness;
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
new file mode 100644
index 0000000..9225a36
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/CoGroupedStreamsTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link CoGroupedStreams}.
+ */
+public class CoGroupedStreamsTest {
+	private DataStream<String> dataStream1;
+	private DataStream<String> dataStream2;
+	private KeySelector<String, String> keySelector;
+	private TumblingEventTimeWindows tsAssigner;
+	private CoGroupFunction<String, String, String> coGroupFunction;
+
+	@Before
+	public void setUp() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		dataStream1 = env.fromElements("a1", "a2", "a3");
+		dataStream2 = env.fromElements("a1", "a2");
+		keySelector = element -> element;
+		tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1L));
+		coGroupFunction = (CoGroupFunction<String, String, String>) (first, second, out) -> out.collect("");
+	}
+
+	@Test
+	public void testDelegateToCoGrouped() {
+		Time lateness = Time.milliseconds(42L);
+
+		CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+			.coGroup(dataStream2)
+			.where(keySelector)
+			.equalTo(keySelector)
+			.window(tsAssigner)
+			.allowedLateness(lateness);
+
+		withLateness.apply(coGroupFunction, BasicTypeInfo.STRING_TYPE_INFO);
+
+		Assert.assertEquals(lateness.toMilliseconds(), withLateness.getWindowedStream().getAllowedLateness());
+	}
+
+	@Test
+	public void testSetAllowedLateness() {
+		Time lateness = Time.milliseconds(42L);
+
+		CoGroupedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+			.coGroup(dataStream2)
+			.where(keySelector)
+			.equalTo(keySelector)
+			.window(tsAssigner)
+			.allowedLateness(lateness);
+
+		Assert.assertEquals(lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds());
+	}
+
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
new file mode 100644
index 0000000..3284c20
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/JoinedStreamsTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit test for {@link JoinedStreams}.
+ */
+public class JoinedStreamsTest {
+	private DataStream<String> dataStream1;
+	private DataStream<String> dataStream2;
+	private KeySelector<String, String> keySelector;
+	private TumblingEventTimeWindows tsAssigner;
+	private JoinFunction<String, String, String> joinFunction;
+
+	@Before
+	public void setUp() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		dataStream1 = env.fromElements("a1", "a2", "a3");
+		dataStream2 = env.fromElements("a1", "a2");
+		keySelector = element -> element;
+		tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1));
+		joinFunction = (first, second) -> first + second;
+	}
+
+	@Test
+	public void testDelegateToCoGrouped() {
+		Time lateness = Time.milliseconds(42L);
+
+		JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+			.join(dataStream2)
+			.where(keySelector)
+			.equalTo(keySelector)
+			.window(tsAssigner)
+			.allowedLateness(lateness);
+
+		withLateness.apply(joinFunction, BasicTypeInfo.STRING_TYPE_INFO);
+
+		Assert.assertEquals(lateness.toMilliseconds(), withLateness.getCoGroupedWindowedStream().getAllowedLateness().toMilliseconds());
+	}
+
+	@Test
+	public void testSetAllowedLateness() {
+		Time lateness = Time.milliseconds(42L);
+
+		JoinedStreams.WithWindow<String, String, String, TimeWindow> withLateness = dataStream1
+			.join(dataStream2)
+			.where(keySelector)
+			.equalTo(keySelector)
+			.window(tsAssigner)
+			.allowedLateness(lateness);
+
+		Assert.assertEquals(lateness.toMilliseconds(), withLateness.getAllowedLateness().toMilliseconds());
+	}
+}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 101d358..f69f642 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -112,7 +113,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
           throw new UnsupportedOperationException(
             "You first need to specify KeySelectors for both inputs using where() and equalTo().")
         }
-        new WithWindow[W](clean(assigner), null, null)
+        new WithWindow[W](clean(assigner), null, null, null)
       }
 
       /**
@@ -125,7 +126,8 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
       class WithWindow[W <: Window](
           windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
           trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
-          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+          val allowedLateness: Time) {
 
         /**
          * Sets the [[Trigger]] that should be used to trigger window emission.
@@ -133,7 +135,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
         @PublicEvolving
         def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
             : WithWindow[W] = {
-          new WithWindow[W](windowAssigner, newTrigger, evictor)
+          new WithWindow[W](windowAssigner, newTrigger, evictor, allowedLateness)
         }
 
         /**
@@ -147,7 +149,16 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
         def evictor(
             newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
             : WithWindow[W] = {
-          new WithWindow[W](windowAssigner, trigger, newEvictor)
+          new WithWindow[W](windowAssigner, trigger, newEvictor, allowedLateness)
+        }
+
+        /**
+          * Sets the time by which elements are allowed to be late.
+          * Delegates to [[WindowedStream#allowedLateness(Time)]]
+          */
+        @PublicEvolving
+        def allowedLateness(newLateness: Time): WithWindow[W] = {
+          new WithWindow[W](windowAssigner, trigger, evictor, newLateness)
         }
 
         /**
@@ -202,6 +213,7 @@ class CoGroupedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
             .window(windowAssigner)
             .trigger(trigger)
             .evictor(evictor)
+            .allowedLateness(allowedLateness)
             .apply(clean(function), implicitly[TypeInformation[T]]))
         }
       }
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index 93b5cc8..3898086 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
+import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.triggers.Trigger
 import org.apache.flink.streaming.api.windowing.windows.Window
 import org.apache.flink.util.Collector
@@ -110,7 +111,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
             "You first need to specify KeySelectors for both inputs using where() and equalTo().")
         }
 
-        new WithWindow[W](clean(assigner), null, null)
+        new WithWindow[W](clean(assigner), null, null, null)
       }
 
       /**
@@ -122,7 +123,8 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
       class WithWindow[W <: Window](
           windowAssigner: WindowAssigner[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], W],
           trigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
-          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W]) {
+          evictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W],
+          val allowedLateness: Time) {
 
         /**
          * Sets the [[Trigger]] that should be used to trigger window emission.
@@ -130,7 +132,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
         @PublicEvolving
         def trigger(newTrigger: Trigger[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
         : WithWindow[W] = {
-          new WithWindow[W](windowAssigner, newTrigger, evictor)
+          new WithWindow[W](windowAssigner, newTrigger, evictor, allowedLateness)
         }
 
         /**
@@ -142,7 +144,16 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
         @PublicEvolving
         def evictor(newEvictor: Evictor[_ >: JavaCoGroupedStreams.TaggedUnion[T1, T2], _ >: W])
         : WithWindow[W] = {
-          new WithWindow[W](windowAssigner, trigger, newEvictor)
+          new WithWindow[W](windowAssigner, trigger, newEvictor, allowedLateness)
+        }
+
+        /**
+          * Sets the time by which elements are allowed to be late.
+          * Delegates to [[WindowedStream#allowedLateness(Time)]]
+          */
+        @PublicEvolving
+        def allowedLateness(newLateness: Time): WithWindow[W] = {
+          new WithWindow[W](windowAssigner, trigger, evictor, newLateness)
         }
 
         /**
@@ -191,6 +202,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
             .window(windowAssigner)
             .trigger(trigger)
             .evictor(evictor)
+            .allowedLateness(allowedLateness)
             .apply(clean(function), implicitly[TypeInformation[T]]))
         }
 
@@ -208,6 +220,7 @@ class JoinedStreams[T1, T2](input1: DataStream[T1], input2: DataStream[T2]) {
             .window(windowAssigner)
             .trigger(trigger)
             .evictor(evictor)
+            .allowedLateness(allowedLateness)
             .apply(clean(function), implicitly[TypeInformation[T]]))
         }
       }
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala
new file mode 100644
index 0000000..0ef5b88
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/CoGroupedStreamsTest.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.{Assert, Test}
+
+/**
+  * Unit test for [[org.apache.flink.streaming.api.scala.CoGroupedStreams]]
+  */
+class CoGroupedStreamsTest {
+  private val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  private val dataStream1 = env.fromElements("a1", "a2", "a3")
+  private val dataStream2 = env.fromElements("a1", "a2")
+  private val keySelector = (s: String) => s
+  private val tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1))
+
+  @Test
+  def testSetAllowedLateness(): Unit = {
+    val lateness = Time.milliseconds(42)
+    val withLateness = dataStream1.coGroup(dataStream2)
+      .where(keySelector)
+      .equalTo(keySelector)
+      .window(tsAssigner)
+      .allowedLateness(lateness)
+    Assert.assertEquals(lateness.toMilliseconds, withLateness.allowedLateness.toMilliseconds)
+  }
+
+}
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala
new file mode 100644
index 0000000..451882e
--- /dev/null
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/JoinedStreamsTest.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
+import org.apache.flink.streaming.api.windowing.time.Time
+import org.junit.{Assert, Test}
+
+/**
+  * Unit test for [[org.apache.flink.streaming.api.scala.JoinedStreams]]
+  */
+class JoinedStreamsTest {
+  private val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  private val dataStream1 = env.fromElements("a1", "a2", "a3")
+  private val dataStream2 = env.fromElements("a1", "a2")
+  private val keySelector = (s: String) => s
+  private val tsAssigner = TumblingEventTimeWindows.of(Time.milliseconds(1))
+
+  @Test
+  def testSetAllowedLateness(): Unit = {
+    val lateness = Time.milliseconds(42)
+    val withLateness = dataStream1.join(dataStream2)
+      .where(keySelector)
+      .equalTo(keySelector)
+      .window(tsAssigner)
+      .allowedLateness(lateness)
+    Assert.assertEquals(lateness.toMilliseconds, withLateness.allowedLateness.toMilliseconds)
+  }
+}