You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/04/12 15:50:01 UTC

[2/5] flink git commit: [FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

[FLINK-9152] Use in-class Context objects in BroadcastProcessFunction

This brings it in line with KeyedBroadcastProcessFunction, which uses
context objects defined in KeyedBroadcastProcessFunction. The context
objects here have no added functionality but we still define them here
so that the methods don't refer to the base class implementations for
consistency.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0838bbea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0838bbea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0838bbea

Branch: refs/heads/master
Commit: 0838bbeaafed791d96bfd1e8e1f8a5e486c35325
Parents: 584229d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 9 16:12:43 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700

----------------------------------------------------------------------
 .../api/functions/co/BroadcastProcessFunction.java      | 12 ++++++++++++
 .../operators/co/CoBroadcastWithNonKeyedOperator.java   |  4 ++--
 2 files changed, 14 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
index 257ea83..9e5540e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/BroadcastProcessFunction.java
@@ -90,4 +90,16 @@ public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadc
 	 *                   to fail and go into recovery.
 	 */
 	public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * A {@link BaseBroadcastProcessFunction.Context context} available to the broadcast side of
+	 * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream}.
+	 */
+	public abstract class Context extends BaseBroadcastProcessFunction.Context {}
+
+	/**
+	 * A {@link BaseBroadcastProcessFunction.Context context} available to the non-keyed side of
+	 * a {@link org.apache.flink.streaming.api.datastream.BroadcastConnectedStream} (if any).
+	 */
+	public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0838bbea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
index 7e1e431..5bed3bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithNonKeyedOperator.java
@@ -23,8 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.BroadcastState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
-import org.apache.flink.streaming.api.functions.co.BaseBroadcastProcessFunction;
 import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction.Context;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
@@ -113,7 +113,7 @@ public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
 		currentWatermark = mark.getTimestamp();
 	}
 
-	private class ReadWriteContextImpl extends BaseBroadcastProcessFunction.Context {
+	private class ReadWriteContextImpl extends Context {
 
 		private final ExecutionConfig config;