You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/07/11 08:09:43 UTC
flink git commit: [FLINK-9470] Allow querying the key in
KeyedProcessFunction
Repository: flink
Updated Branches:
refs/heads/master 53e665765 -> cde504eb4
[FLINK-9470] Allow querying the key in KeyedProcessFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cde504eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cde504eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cde504eb
Branch: refs/heads/master
Commit: cde504eb401441e589b3f295e40db37a005ee6d9
Parents: 53e6657
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 29 16:46:19 2018 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 11 10:09:02 2018 +0200
----------------------------------------------------------------------
.../api/functions/KeyedProcessFunction.java | 6 +++
.../co/KeyedBroadcastProcessFunction.java | 7 +++
.../api/operators/KeyedProcessOperator.java | 6 +++
.../co/CoBroadcastWithKeyedOperator.java | 7 +++
.../api/operators/KeyedProcessOperatorTest.java | 44 +++++++++++++++++
.../co/CoBroadcastWithKeyedOperatorTest.java | 51 ++++++++++++++++++++
6 files changed, 121 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
index a03480b..eb89362 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
@@ -110,6 +110,11 @@ public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
* @param value The record to emit.
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
+
+ /**
+ * Get key of the element being processed.
+ */
+ public abstract K getCurrentKey();
}
/**
@@ -124,6 +129,7 @@ public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
/**
* Get key of the firing timer.
*/
+ @Override
public abstract K getCurrentKey();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 9263be0..589ba9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -158,6 +158,12 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
* A {@link TimerService} for querying time and registering timers.
*/
public abstract TimerService timerService();
+
+
+ /**
+ * Get key of the element being processed.
+ */
+ public abstract KS getCurrentKey();
}
/**
@@ -174,6 +180,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
/**
* Get the key of the firing timer.
*/
+ @Override
public abstract KS getCurrentKey();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index b74fdf3..b6171c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -131,6 +131,12 @@ public class KeyedProcessOperator<K, IN, OUT>
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public K getCurrentKey() {
+ return (K) KeyedProcessOperator.this.getCurrentKey();
+ }
}
private class OnTimerContextImpl extends KeyedProcessFunction<K, IN, OUT>.OnTimerContext {
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 5f7bbe2..0bfa686 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -288,6 +288,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
}
return state;
}
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public KS getCurrentKey() {
+ return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
+ }
+
}
private class OnTimerContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index c01329e..2032916 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
@@ -43,6 +44,7 @@ import org.junit.rules.ExpectedException;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Tests {@link KeyedProcessOperator}.
@@ -53,6 +55,48 @@ public class KeyedProcessOperatorTest extends TestLogger {
public ExpectedException expectedException = ExpectedException.none();
@Test
+ public void testKeyQuerying() throws Exception {
+
+ class KeyQueryingProcessFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
+
+ @Override
+ public void processElement(
+ Tuple2<Integer, String> value,
+ Context ctx,
+ Collector<String> out) throws Exception {
+
+ assertTrue("Did not get expected key.", ctx.getCurrentKey().equals(value.f0));
+
+ // we check that we receive this output, to ensure that the assert was actually checked
+ out.collect(value.f1);
+ }
+ }
+
+ KeyedProcessOperator<Integer, Tuple2<Integer, String>, String> operator =
+ new KeyedProcessOperator<>(new KeyQueryingProcessFunction());
+
+ try (
+ OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
+ new KeyedOneInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"), 12L));
+ testHarness.processElement(new StreamRecord<>(Tuple2.of(42, "42"), 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>("5", 12L));
+ expectedOutput.add(new StreamRecord<>("42", 13L));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+ }
+
+ @Test
public void testTimestampAndWatermarkQuerying() throws Exception {
KeyedProcessOperator<Integer, Integer, String> operator =
http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index c3692d5..715bc9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
@@ -70,6 +71,56 @@ public class CoBroadcastWithKeyedOperatorTest {
BasicTypeInfo.INT_TYPE_INFO
);
+ @Test
+ public void testKeyQuerying() throws Exception {
+
+ class KeyQueryingProcessFunction extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, String> {
+
+ @Override
+ public void processElement(
+ Tuple2<Integer, String> value,
+ ReadOnlyContext ctx,
+ Collector<String> out) throws Exception {
+ assertTrue("Did not get expected key.", ctx.getCurrentKey().equals(value.f0));
+
+ // we check that we receive this output, to ensure that the assert was actually checked
+ out.collect(value.f1);
+
+ }
+
+ @Override
+ public void processBroadcastElement(
+ String value,
+ Context ctx,
+ Collector<String> out) throws Exception {
+
+ }
+ }
+
+ CoBroadcastWithKeyedOperator<Integer, Tuple2<Integer, String>, String, String> operator =
+ new CoBroadcastWithKeyedOperator<>(new KeyQueryingProcessFunction(), Collections.emptyList());
+
+ try (
+ TwoInputStreamOperatorTestHarness<Tuple2<Integer, String>, String, String> testHarness =
+ new KeyedTwoInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , null, BasicTypeInfo.INT_TYPE_INFO)) {
+
+ testHarness.setup();
+ testHarness.open();
+
+ testHarness.processElement1(new StreamRecord<>(Tuple2.of(5, "5"), 12L));
+ testHarness.processElement1(new StreamRecord<>(Tuple2.of(42, "42"), 13L));
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>("5", 12L));
+ expectedOutput.add(new StreamRecord<>("42", 13L));
+
+ TestHarnessUtil.assertOutputEquals(
+ "Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+ }
+
/** Test the iteration over the keyed state on the broadcast side. */
@Test
public void testAccessToKeyedStateIt() throws Exception {