You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/04/12 22:38:25 UTC

[1/5] flink git commit: [FLINK-9152] Harmonize BroadcastProcessFunction Context names

Repository: flink
Updated Branches:
  refs/heads/release-1.5 1f21af6fd -> 833b83b3c


[FLINK-9152] Harmonize BroadcastProcessFunction Context names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/51beea6e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/51beea6e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/51beea6e

Branch: refs/heads/release-1.5
Commit: 51beea6e23200f64507b6c1996b4fa66c4a2f819
Parents: 179ae58
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 9 16:08:58 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:08:31 2018 -0700

----------------------------------------------------------------------
 .../co/KeyedBroadcastProcessFunction.java       | 16 ++++++-------
 .../co/CoBroadcastWithKeyedOperator.java        |  6 +++--
 .../flink/streaming/api/DataStreamTest.java     |  4 ++--
 .../co/CoBroadcastWithKeyedOperatorTest.java    | 24 ++++++++++----------
 .../api/scala/BroadcastStateITCase.scala        |  4 ++--
 .../streaming/runtime/BroadcastStateITCase.java |  4 ++--
 6 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 6e6ae5c..9263be0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -39,9 +39,9 @@ import org.apache.flink.util.Collector;
  *
  * <p>The user has to implement two methods:
  * <ol>
- *     <li>the {@link #processBroadcastElement(Object, KeyedContext, Collector)} which will be applied to
+ *     <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to
  *     each element in the broadcast side
- *     <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the
+ *     <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the
  *     non-broadcasted/keyed side.
  * </ol>
  *
@@ -71,7 +71,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * The context is only valid during the invocation of this method, do not store it.
 	 *
 	 * @param value The stream element.
-	 * @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element,
+	 * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
 	 *            querying the current processing/event time and iterating the broadcast state
 	 *            with <b>read-only</b> access.
 	 *            The context is only valid during the invocation of this method, do not store it.
@@ -79,7 +79,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
+	public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
 
 	/**
 	 * This method is called for each element in the
@@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	public abstract void processBroadcastElement(final IN2 value, final KeyedContext ctx, final Collector<OUT> out) throws Exception;
+	public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
 
 	/**
 	 * Called when a timer set using {@link TimerService} fires.
@@ -130,7 +130,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * this also allows to apply a {@link KeyedStateFunction} to the (local) states of all active keys
 	 * in the your backend.
 	 */
-	public abstract class KeyedContext extends Context {
+	public abstract class Context extends BaseBroadcastProcessFunction.Context {
 
 		/**
 		 * Applies the provided {@code function} to the state
@@ -152,7 +152,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the
 	 * broadcast state and a {@link TimerService} for querying time and registering timers.
 	 */
-	public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
+	public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {
 
 		/**
 		 * A {@link TimerService} for querying time and registering timers.
@@ -163,7 +163,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	/**
 	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	public abstract class OnTimerContext extends KeyedReadOnlyContext {
+	public abstract class OnTimerContext extends ReadOnlyContext {
 
 		/**
 		 * The {@link TimeDomain} of the firing timer, i.e. if it is

http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 871363b..5f7bbe2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.SimpleTimerService;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -141,7 +142,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 		onTimerContext.timer = null;
 	}
 
-	private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
+	private class ReadWriteContextImpl
+			extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
 
 		private final ExecutionConfig config;
 
@@ -220,7 +222,7 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 		}
 	}
 
-	private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
+	private class ReadOnlyContextImpl extends ReadOnlyContext {
 
 		private final ExecutionConfig config;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 6326672..4d2d6e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -879,12 +879,12 @@ public class DataStreamTest extends TestLogger {
 		bcStream.process(
 				new KeyedBroadcastProcessFunction<String, Long, String, String>() {
 					@Override
-					public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+					public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
 						// do nothing
 					}
 
 					@Override
-					public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+					public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 						// do nothing
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index b923b75..c3692d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -139,7 +139,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			ctx.applyToKeyedState(
 					listStateDesc,
@@ -158,7 +158,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			getRuntimeContext().getListState(listStateDesc).add(value);
 		}
 	}
@@ -216,12 +216,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			out.collect("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			ctx.timerService().registerEventTimeTimer(timerTS);
 			out.collect("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
@@ -289,12 +289,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 		};
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 	}
@@ -380,14 +380,14 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			final String key = value + "." + keyPostfix;
 			ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value);
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			Iterable<Map.Entry<String, Integer>> broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
 			Iterator<Map.Entry<String, Integer>> iter = broadcastStateIt.iterator();
 
@@ -621,7 +621,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			for (String k : keysToRegister) {
 				ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, value);
@@ -629,7 +629,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
 				out.collect(entry.toString());
 			}
@@ -652,12 +652,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 							private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
 
 							@Override
-							public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+							public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 								getRuntimeContext().getState(valueState).value(); // this should fail
 							}
 
 							@Override
-							public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+							public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 								// do nothing
 							}
 						})

http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 55bb3ba..f1bfced 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -109,7 +109,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processElement(
       value: Long,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#ReadOnlyContext,
       out: Collector[String]): Unit = {
 
     val currentTime = nextTimerTimestamp
@@ -121,7 +121,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processBroadcastElement(
       value: String,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context,
       out: Collector[String]): Unit = {
 
     val key = value.split(":")(1).toLong

http://git-wip-us.apache.org/repos/asf/flink/blob/51beea6e/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 7ccba33..8f442c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -167,7 +167,7 @@ public class BroadcastStateITCase {
 		}
 
 		@Override
-		public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			long currentTime = nextTimerTimestamp;
 			nextTimerTimestamp++;
 			ctx.timerService().registerEventTimeTimer(currentTime);
@@ -175,7 +175,7 @@ public class BroadcastStateITCase {
 		}
 
 		@Override
-		public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
 			long key = Long.parseLong(value.split(":")[1]);
 			ctx.getBroadcastState(descriptor).put(key, value);
 		}


[3/5] flink git commit: [FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

Posted by al...@apache.org.
[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

This brings it in line with KeyedBroadcastProcessFunction, which uses
context objects defined in KeyedBroadcastProcessFunction. The context
objects here have no added functionality but we still define them here
so that the methods don't refer to the base class implementations for
consistency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/24a8f518
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/24a8f518
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/24a8f518

Branch: refs/heads/release-1.5
Commit: 24a8f5186cec112feb06ca1d38282e5a3cd9f085
Parents: 51beea6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 9 16:12:43 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:08:32 2018 -0700

----------------------------------------------------------------------
 .../api/functions/co/BroadcastProcessFunction.java      | 12 ++++++++++++
 .../operators/co/CoBroadcastWithNonKeyedOperator.java   |  4 ++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/24a8f518/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
index 257ea83..9e5540e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
@@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadc
 	 *                   to fail and go into recovery.
 	 */
 	public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of
+	 * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
+	 */
+	public abstract class Context extends BaseBroadcastProcessFunction.Context {}
+
+	/**
+	 * A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of
+	 * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
+	 */
+	public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/24a8f518/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
index 7e1e431..5bed3bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
-import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -113,7 +113,7 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
 		currentWatermark = mark.getTimestamp();
 	}
 
-	private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context {
+	private class ReadWriteContextImpl extends Context {
 
 		private final ExecutionConfig config;
 


[5/5] flink git commit: [FLINK-9152] Add simple ITCase for non-keyed Broadcast Connect translation

Posted by al...@apache.org.
[FLINK-9152] Add simple ITCase for non-keyed Broadcast Connect translation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/833b83b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/833b83b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/833b83b3

Branch: refs/heads/release-1.5
Commit: 833b83b3c243a30172e8ad9f3c57b1390a94c90d
Parents: 179b9bc
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 11 17:47:03 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:08:32 2018 -0700

----------------------------------------------------------------------
 .../streaming/runtime/BroadcastStateITCase.java | 93 +++++++++++++++++++-
 1 file changed, 89 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/833b83b3/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 8f442c0..9400614 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.datastream.BroadcastStream;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -47,7 +48,7 @@ import static org.junit.Assert.assertEquals;
 public class BroadcastStateITCase {
 
 	@Test
-	public void testConnectWithBroadcastTranslation() throws Exception {
+	public void testKeyedWithBroadcastTranslation() throws Exception {
 
 		final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>(
 				"broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
@@ -90,7 +91,7 @@ public class BroadcastStateITCase {
 
 		// the timestamp should be high enough to trigger the timer after all the elements arrive.
 		final DataStream<String> output = srcOne.connect(broadcast).process(
-				new TestBroadcastProcessFunction(100000L, expected));
+				new TestKeyedBroadcastProcessFunction(100000L, expected));
 
 		output
 				.addSink(new TestSink(expected.size()))
@@ -98,6 +99,58 @@ public class BroadcastStateITCase {
 		env.execute();
 	}
 
+	@Test
+	public void testBroadcastTranslation() throws Exception {
+
+		final MapStateDescriptor<Long, String> utterDescriptor = new MapStateDescriptor<>(
+			"broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+		);
+
+		final Map<Long, String> expected = new HashMap<>();
+		expected.put(0L, "test:0");
+		expected.put(1L, "test:1");
+		expected.put(2L, "test:2");
+		expected.put(3L, "test:3");
+		expected.put(4L, "test:4");
+		expected.put(5L, "test:5");
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+		final DataStream<Long> srcOne = env.generateSequence(0L, 5L)
+			.assignTimestampsAndWatermarks(new CustomWmEmitter<Long>() {
+
+				private static final long serialVersionUID = -8500904795760316195L;
+
+				@Override
+				public long extractTimestamp(Long element, long previousElementTimestamp) {
+					return element;
+				}
+			});
+
+		final DataStream<String> srcTwo = env.fromCollection(expected.values())
+			.assignTimestampsAndWatermarks(new CustomWmEmitter<String>() {
+
+				private static final long serialVersionUID = -2148318224248467213L;
+
+				@Override
+				public long extractTimestamp(String element, long previousElementTimestamp) {
+					return Long.parseLong(element.split(":")[1]);
+				}
+			});
+
+		final BroadcastStream<String> broadcast = srcTwo.broadcast(utterDescriptor);
+
+		// the timestamp should be high enough to trigger the timer after all the elements arrive.
+		final DataStream<String> output = srcOne.connect(broadcast).process(
+			new TestBroadcastProcessFunction());
+
+		output
+			.addSink(new TestSink(0))
+			.setParallelism(1);
+		env.execute();
+	}
+
 	private static class TestSink extends RichSinkFunction<String> {
 
 		private static final long serialVersionUID = 7252508825104554749L;
@@ -141,7 +194,7 @@ public class BroadcastStateITCase {
 	 * while on the non-broadcast side, it sets a timer to fire at some point in the future. Finally, when the onTimer
 	 * method is called (i.e. when the timer fires), we verify that the result is the expected one.
 	 */
-	private static class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> {
+	private static class TestKeyedBroadcastProcessFunction extends KeyedBroadcastProcessFunction<Long, Long, String, String> {
 
 		private static final long serialVersionUID = 7616910653561100842L;
 
@@ -152,7 +205,7 @@ public class BroadcastStateITCase {
 
 		private transient MapStateDescriptor<Long, String> descriptor;
 
-		TestBroadcastProcessFunction(final long initialTimerTimestamp, final Map<Long, String> expectedBroadcastState) {
+		TestKeyedBroadcastProcessFunction(final long initialTimerTimestamp, final Map<Long, String> expectedBroadcastState) {
 			expectedState = expectedBroadcastState;
 			nextTimerTimestamp = initialTimerTimestamp;
 		}
@@ -194,4 +247,36 @@ public class BroadcastStateITCase {
 			out.collect(Long.toString(timestamp));
 		}
 	}
+
+	/**
+	 * This doesn't do much but we use it to verify that translation of non-keyed broadcast connect
+	 * works.
+	 */
+	private static class TestBroadcastProcessFunction extends
+			BroadcastProcessFunction<Long, String, String> {
+
+		private static final long serialVersionUID = 7616910653561100842L;
+
+		private transient MapStateDescriptor<Long, String> descriptor;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			descriptor = new MapStateDescriptor<>(
+				"broadcast-state", BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO
+			);
+		}
+
+		@Override
+		public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
+		}
+
+		@Override
+		public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
+			long key = Long.parseLong(value.split(":")[1]);
+			ctx.getBroadcastState(descriptor).put(key, value);
+		}
+	}
+
 }


[2/5] flink git commit: [FLINK-9160] Make subclasses of RuntimeContext internal that should be internal

Posted by al...@apache.org.
[FLINK-9160] Make subclasses of RuntimeContext internal that should be internal


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/179ae583
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/179ae583
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/179ae583

Branch: refs/heads/release-1.5
Commit: 179ae583e2d01f78d5c214198ffc6ffb75148c7d
Parents: 1f21af6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 11 19:40:04 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:08:31 2018 -0700

----------------------------------------------------------------------
 .../api/common/functions/util/AbstractRuntimeUDFContext.java     | 3 ++-
 .../flink/streaming/api/operators/StreamingRuntimeContext.java   | 4 ++--
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/179ae583/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index d0d7e52..6246e80 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.functions.util;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.TaskInfo;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A standalone implementation of the {@link RuntimeContext}, created by runtime UDF operators.
  */
-@PublicEvolving
+@Internal
 public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 
 	private final TaskInfo taskInfo;

http://git-wip-us.apache.org/repos/asf/flink/blob/179ae583/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
index cb40214..1f42ccf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.util.AbstractRuntimeUDFContext;
@@ -50,7 +50,7 @@ import java.util.Map;
  * Implementation of the {@link org.apache.flink.api.common.functions.RuntimeContext},
  * for streaming operators.
  */
-@PublicEvolving
+@Internal
 public class StreamingRuntimeContext extends AbstractRuntimeUDFContext {
 
 	/** The operator to which this function belongs. */


[4/5] flink git commit: [FLINK-9152] Fix error message on BroadcastConnectedStream

Posted by al...@apache.org.
[FLINK-9152] Fix error message on BroadcastConnectedStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/179b9bce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/179b9bce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/179b9bce

Branch: refs/heads/release-1.5
Commit: 179b9bce2fcdbdfa72b4b89648c48fe084b56fe2
Parents: 24a8f51
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Apr 11 17:38:33 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:08:32 2018 -0700

----------------------------------------------------------------------
 .../flink/streaming/api/datastream/BroadcastConnectedStream.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/179b9bce/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
index e5454ef..cb7d8c9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/BroadcastConnectedStream.java
@@ -158,7 +158,7 @@ public class BroadcastConnectedStream<IN1, IN2> {
 
 		Preconditions.checkNotNull(function);
 		Preconditions.checkArgument(inputStream1 instanceof KeyedStream,
-				"A KeyedBroadcastProcessFunction can only be used with a keyed stream as the second input.");
+				"A KeyedBroadcastProcessFunction can only be used on a keyed stream.");
 
 		TwoInputStreamOperator<IN1, IN2, OUT> operator =
 				new CoBroadcastWithKeyedOperator<>(clean(function), broadcastStateDescriptors);
@@ -209,7 +209,7 @@ public class BroadcastConnectedStream<IN1, IN2> {
 
 		Preconditions.checkNotNull(function);
 		Preconditions.checkArgument(!(inputStream1 instanceof KeyedStream),
-				"A BroadcastProcessFunction can only be used with a non-keyed stream as the second input.");
+				"A BroadcastProcessFunction can only be used on a non-keyed stream.");
 
 		TwoInputStreamOperator<IN1, IN2, OUT> operator =
 				new CoBroadcastWithNonKeyedOperator<>(clean(function), broadcastStateDescriptors);