You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/04/12 15:50:04 UTC
[5/5] flink git commit: [FLINK-9152] Harmonize
BroadcastProcessFunction Context names
[FLINK-9152] Harmonize BroadcastProcessFunction Context names
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/584229dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/584229dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/584229dc
Branch: refs/heads/master
Commit: 584229dc00e62a6a8540b059645269bb04d0ba04
Parents: d5d6842
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Apr 9 16:08:58 2018 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Apr 12 08:04:30 2018 -0700
----------------------------------------------------------------------
.../co/KeyedBroadcastProcessFunction.java | 16 ++++++-------
.../co/CoBroadcastWithKeyedOperator.java | 6 +++--
.../flink/streaming/api/DataStreamTest.java | 4 ++--
.../co/CoBroadcastWithKeyedOperatorTest.java | 24 ++++++++++----------
.../api/scala/BroadcastStateITCase.scala | 4 ++--
.../streaming/runtime/BroadcastStateITCase.java | 4 ++--
6 files changed, 30 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 6e6ae5c..9263be0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -39,9 +39,9 @@ import org.apache.flink.util.Collector;
*
* <p>The user has to implement two methods:
* <ol>
- * <li>the {@link #processBroadcastElement(Object, KeyedContext, Collector)} which will be applied to
+ * <li>the {@link #processBroadcastElement(Object, Context, Collector)} which will be applied to
* each element in the broadcast side
- * <li> and the {@link #processElement(Object, KeyedReadOnlyContext, Collector)} which will be applied to the
+ * <li> and the {@link #processElement(Object, ReadOnlyContext, Collector)} which will be applied to the
* non-broadcasted/keyed side.
* </ol>
*
@@ -71,7 +71,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* The context is only valid during the invocation of this method, do not store it.
*
* @param value The stream element.
- * @param ctx A {@link KeyedReadOnlyContext} that allows querying the timestamp of the element,
+ * @param ctx A {@link ReadOnlyContext} that allows querying the timestamp of the element,
* querying the current processing/event time and iterating the broadcast state
* with <b>read-only</b> access.
* The context is only valid during the invocation of this method, do not store it.
@@ -79,7 +79,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
- public abstract void processElement(final IN1 value, final KeyedReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
+ public abstract void processElement(final IN1 value, final ReadOnlyContext ctx, final Collector<OUT> out) throws Exception;
/**
* This method is called for each element in the
@@ -102,7 +102,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* @throws Exception The function may throw exceptions which cause the streaming program
* to fail and go into recovery.
*/
- public abstract void processBroadcastElement(final IN2 value, final KeyedContext ctx, final Collector<OUT> out) throws Exception;
+ public abstract void processBroadcastElement(final IN2 value, final Context ctx, final Collector<OUT> out) throws Exception;
/**
* Called when a timer set using {@link TimerService} fires.
@@ -130,7 +130,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* this also allows to apply a {@link KeyedStateFunction} to the (local) states of all active keys
* in the your backend.
*/
- public abstract class KeyedContext extends Context {
+ public abstract class Context extends BaseBroadcastProcessFunction.Context {
/**
* Applies the provided {@code function} to the state
@@ -152,7 +152,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* this also allows to get a <b>read-only</b> {@link Iterable} over the elements stored in the
* broadcast state and a {@link TimerService} for querying time and registering timers.
*/
- public abstract class KeyedReadOnlyContext extends ReadOnlyContext {
+ public abstract class ReadOnlyContext extends BaseBroadcastProcessFunction.ReadOnlyContext {
/**
* A {@link TimerService} for querying time and registering timers.
@@ -163,7 +163,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
- public abstract class OnTimerContext extends KeyedReadOnlyContext {
+ public abstract class OnTimerContext extends ReadOnlyContext {
/**
* The {@link TimeDomain} of the firing timer, i.e. if it is
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 871363b..5f7bbe2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.SimpleTimerService;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
+import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction.ReadOnlyContext;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
@@ -141,7 +142,8 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
onTimerContext.timer = null;
}
- private class ReadWriteContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedContext {
+ private class ReadWriteContextImpl
+ extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
private final ExecutionConfig config;
@@ -220,7 +222,7 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
}
}
- private class ReadOnlyContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.KeyedReadOnlyContext {
+ private class ReadOnlyContextImpl extends ReadOnlyContext {
private final ExecutionConfig config;
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 6326672..4d2d6e1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -879,12 +879,12 @@ public class DataStreamTest extends TestLogger {
bcStream.process(
new KeyedBroadcastProcessFunction<String, Long, String, String>() {
@Override
- public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// do nothing
}
@Override
- public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// do nothing
}
});
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index b923b75..c3692d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -139,7 +139,7 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
ctx.applyToKeyedState(
listStateDesc,
@@ -158,7 +158,7 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
getRuntimeContext().getListState(listStateDesc).add(value);
}
}
@@ -216,12 +216,12 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
out.collect("BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ctx.timerService().registerEventTimeTimer(timerTS);
out.collect("NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
@@ -289,12 +289,12 @@ public class CoBroadcastWithKeyedOperatorTest {
};
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
ctx.output(BROADCAST_TAG, "BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
ctx.output(NON_BROADCAST_TAG, "NON-BR:" + value + " WM:" + ctx.currentWatermark() + " TS:" + ctx.timestamp());
}
}
@@ -380,14 +380,14 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
final String key = value + "." + keyPostfix;
ctx.getBroadcastState(STATE_DESCRIPTOR).put(key, value);
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
Iterable<Map.Entry<String, Integer>> broadcastStateIt = ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries();
Iterator<Map.Entry<String, Integer>> iter = broadcastStateIt.iterator();
@@ -621,7 +621,7 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
// put an element in the broadcast state
for (String k : keysToRegister) {
ctx.getBroadcastState(STATE_DESCRIPTOR).put(k, value);
@@ -629,7 +629,7 @@ public class CoBroadcastWithKeyedOperatorTest {
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
for (Map.Entry<String, Integer> entry : ctx.getBroadcastState(STATE_DESCRIPTOR).immutableEntries()) {
out.collect(entry.toString());
}
@@ -652,12 +652,12 @@ public class CoBroadcastWithKeyedOperatorTest {
private final ValueStateDescriptor<String> valueState = new ValueStateDescriptor<>("any", BasicTypeInfo.STRING_TYPE_INFO);
@Override
- public void processBroadcastElement(Integer value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(Integer value, Context ctx, Collector<String> out) throws Exception {
getRuntimeContext().getState(valueState).value(); // this should fail
}
@Override
- public void processElement(String value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// do nothing
}
})
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
index 55bb3ba..f1bfced 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/BroadcastStateITCase.scala
@@ -109,7 +109,7 @@ class TestBroadcastProcessFunction(
@throws[Exception]
override def processElement(
value: Long,
- ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedReadOnlyContext,
+ ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#ReadOnlyContext,
out: Collector[String]): Unit = {
val currentTime = nextTimerTimestamp
@@ -121,7 +121,7 @@ class TestBroadcastProcessFunction(
@throws[Exception]
override def processBroadcastElement(
value: String,
- ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#KeyedContext,
+ ctx: KeyedBroadcastProcessFunction[Long, Long, String, String]#Context,
out: Collector[String]): Unit = {
val key = value.split(":")(1).toLong
http://git-wip-us.apache.org/repos/asf/flink/blob/584229dc/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
index 7ccba33..8f442c0 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/BroadcastStateITCase.java
@@ -167,7 +167,7 @@ public class BroadcastStateITCase {
}
@Override
- public void processElement(Long value, KeyedReadOnlyContext ctx, Collector<String> out) throws Exception {
+ public void processElement(Long value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
long currentTime = nextTimerTimestamp;
nextTimerTimestamp++;
ctx.timerService().registerEventTimeTimer(currentTime);
@@ -175,7 +175,7 @@ public class BroadcastStateITCase {
}
@Override
- public void processBroadcastElement(String value, KeyedContext ctx, Collector<String> out) throws Exception {
+ public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
long key = Long.parseLong(value.split(":")[1]);
ctx.getBroadcastState(descriptor).put(key, value);
}