You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/04/12 15:50:04 UTC

[5/5] flink git commit: [FLINK-9152] Harmonize BroadcastProcessFunction Context names

[FLINK-9152] Harmonize BroadcastProcessFunction Context names


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/584229dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/584229dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/584229dc

Branch: refs/heads/master
Commit: 584229dc00e62a6a8540b059645269bb04d0ba04
Parents: d5d6842
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 9 16:08:58 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700

----------------------------------------------------------------------
 .../co/KeyedBroadcastProcessFunction.java       | 16 ++++++-------
 .../co/CoBroadcastWithKeyedOperator.java        |  6 +++--
 .../flink/streaming/api/DataStreamTest.java     |  4 ++--
 .../co/CoBroadcastWithKeyedOperatorTest.java    | 24 ++++++++++----------
 .../api/scala/BroadcastStateITCase.scala        |  4 ++--
 .../streaming/runtime/BroadcastStateITCase.java |  4 ++--
 6 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 6e6ae5c..9263be0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -39,9 +39,9 @@ import org.apache.flink.util.Collector;
  *
  * <p>The user has to implement two methods:
  * <ol>
- *     <li>the {@link #processBroadcastElement(Object, KeyedContext, Collector)} which will be applied to
+ *     <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to
  *     each element in the broadcast side
- *     <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the
+ *     <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the
  *     non-broadcasted/keyed side.
  * </ol>
  *
@@ -71,7 +71,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * The context is only valid during the invocation of this method, do not store it.
 	 *
 	 * @param value The stream element.
-	 * @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element,
+	 * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
 	 *            querying the current processing/event time and iterating the broadcast state
 	 *            with <b>read-only</b> access.
 	 *            The context is only valid during the invocation of this method, do not store it.
@@ -79,7 +79,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
+	public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
 
 	/**
 	 * This method is called for each element in the
@@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	public abstract void processBroadcastElement(final IN2 value, final KeyedContext ctx, final Collector<OUT> out) throws Exception;
+	public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
 
 	/**
 	 * Called when a timer set using {@link TimerService} fires.
@@ -130,7 +130,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * this also allows to apply a {@link KeyedStateFunction} to the (local) states of all active keys
 	 * in the your backend.
 	 */
-	public abstract class KeyedContext extends Context {
+	public abstract class Context extends BaseBroadcastProcessFunction.Context {
 
 		/**
 		 * Applies the provided {@code function} to the state
@@ -152,7 +152,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	 * this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the
 	 * broadcast state and a {@link TimerService} for querying time and registering timers.
 	 */
-	public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
+	public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {
 
 		/**
 		 * A {@link TimerService} for querying time and registering timers.
@@ -163,7 +163,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 	/**
 	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	public abstract class OnTimerContext extends KeyedReadOnlyContext {
+	public abstract class OnTimerContext extends ReadOnlyContext {
 
 		/**
 		 * The {@link TimeDomain} of the firing timer, i.e. if it is

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 871363b..5f7bbe2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.SimpleTimerService;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -141,7 +142,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 		onTimerContext.timer = null;
 	}
 
-	private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
+	private class ReadWriteContextImpl
+			extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
 
 		private final ExecutionConfig config;
 
@@ -220,7 +222,7 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 		}
 	}
 
-	private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
+	private class ReadOnlyContextImpl extends ReadOnlyContext {
 
 		private final ExecutionConfig config;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 6326672..4d2d6e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -879,12 +879,12 @@ public class DataStreamTest extends TestLogger {
 		bcStream.process(
 				new KeyedBroadcastProcessFunction<String, Long, String, String>() {
 					@Override
-					public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+					public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
 						// do nothing
 					}
 
 					@Override
-					public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+					public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 						// do nothing
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index b923b75..c3692d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -139,7 +139,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			ctx.applyToKeyedState(
 					listStateDesc,
@@ -158,7 +158,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			getRuntimeContext().getListState(listStateDesc).add(value);
 		}
 	}
@@ -216,12 +216,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			out.collect("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			ctx.timerService().registerEventTimeTimer(timerTS);
 			out.collect("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
@@ -289,12 +289,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 		};
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
 		}
 	}
@@ -380,14 +380,14 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			final String key = value + "." + keyPostfix;
 			ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value);
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			Iterable<Map.Entry<String, Integer>> broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
 			Iterator<Map.Entry<String, Integer>> iter = broadcastStateIt.iterator();
 
@@ -621,7 +621,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 			// put an element in the broadcast state
 			for (String k : keysToRegister) {
 				ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, value);
@@ -629,7 +629,7 @@ public class CoBroadcastWithKeyedOperatorTest {
 		}
 
 		@Override
-		public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
 				out.collect(entry.toString());
 			}
@@ -652,12 +652,12 @@ public class CoBroadcastWithKeyedOperatorTest {
 							private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
 
 							@Override
-							public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+							public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
 								getRuntimeContext().getState(valueState).value(); // this should fail
 							}
 
 							@Override
-							public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+							public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 								// do nothing
 							}
 						})

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 55bb3ba..f1bfced 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -109,7 +109,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processElement(
       value: Long,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#ReadOnlyContext,
       out: Collector[String]): Unit = {
 
     val currentTime = nextTimerTimestamp
@@ -121,7 +121,7 @@ class TestBroadcastProcessFunction(
   @throws[Exception]
   override def processBroadcastElement(
       value: String,
-      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext,
+      ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context,
       out: Collector[String]): Unit = {
 
     val key = value.split(":")(1).toLong

http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 7ccba33..8f442c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -167,7 +167,7 @@ public class BroadcastStateITCase {
 		}
 
 		@Override
-		public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+		public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
 			long currentTime = nextTimerTimestamp;
 			nextTimerTimestamp++;
 			ctx.timerService().registerEventTimeTimer(currentTime);
@@ -175,7 +175,7 @@ public class BroadcastStateITCase {
 		}
 
 		@Override
-		public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+		public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
 			long key = Long.parseLong(value.split(":")[1]);
 			ctx.getBroadcastState(descriptor).put(key, value);
 		}