You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2018/07/11 08:09:43 UTC

flink git commit: [FLINK-9470] Allow querying the key in KeyedProcessFunction

Repository: flink
Updated Branches:
  refs/heads/master 53e665765 -> cde504eb4


[FLINK-9470] Allow querying the key in KeyedProcessFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cde504eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cde504eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cde504eb

Branch: refs/heads/master
Commit: cde504eb401441e589b3f295e40db37a005ee6d9
Parents: 53e6657
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 29 16:46:19 2018 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Jul 11 10:09:02 2018 +0200

----------------------------------------------------------------------
 .../api/functions/KeyedProcessFunction.java     |  6 +++
 .../co/KeyedBroadcastProcessFunction.java       |  7 +++
 .../api/operators/KeyedProcessOperator.java     |  6 +++
 .../co/CoBroadcastWithKeyedOperator.java        |  7 +++
 .../api/operators/KeyedProcessOperatorTest.java | 44 +++++++++++++++++
 .../co/CoBroadcastWithKeyedOperatorTest.java    | 51 ++++++++++++++++++++
 6 files changed, 121 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
index a03480b..eb89362 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
@@ -110,6 +110,11 @@ public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
 		 * @param value The record to emit.
 		 */
 		public abstract <X> void output(OutputTag<X> outputTag, X value);
+
+		/**
+		 * Get key of the element being processed.
+		 */
+		public abstract K getCurrentKey();
 	}
 
 	/**
@@ -124,6 +129,7 @@ public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction
 		/**
 		 * Get key of the firing timer.
 		 */
+		@Override
 		public abstract K getCurrentKey();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
index 9263be0..589ba9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedBroadcastProcessFunction.java
@@ -158,6 +158,12 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
 		public abstract TimerService timerService();
+
+
+		/**
+		 * Get key of the element being processed.
+		 */
+		public abstract KS getCurrentKey();
 	}
 
 	/**
@@ -174,6 +180,7 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends B
 		/**
 		 * Get the key of the firing timer.
 		 */
+		@Override
 		public abstract KS getCurrentKey();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index b74fdf3..b6171c2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -131,6 +131,12 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
 		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public K getCurrentKey() {
+			return (K) KeyedProcessOperator.this.getCurrentKey();
+		}
 	}
 
 	private class OnTimerContextImpl extends KeyedProcessFunction<K, IN, OUT>.OnTimerContext {

http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
index 5f7bbe2..0bfa686 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java
@@ -288,6 +288,13 @@ public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
 			}
 			return state;
 		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public KS getCurrentKey() {
+			return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
+		}
+
 	}
 
 	private class OnTimerContextImpl extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {

http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index c01329e..2032916 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
@@ -43,6 +44,7 @@ import org.junit.rules.ExpectedException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests {@link KeyedProcessOperator}.
@@ -53,6 +55,48 @@ public class KeyedProcessOperatorTest extends TestLogger {
 	public ExpectedException expectedException = ExpectedException.none();
 
 	@Test
+	public void testKeyQuerying() throws Exception {
+
+		class KeyQueryingProcessFunction extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
+
+			@Override
+			public void processElement(
+				Tuple2<Integer, String> value,
+				Context ctx,
+				Collector<String> out) throws Exception {
+
+				assertTrue("Did not get expected key.", ctx.getCurrentKey().equals(value.f0));
+
+				// we check that we receive this output, to ensure that the assert was actually checked
+				out.collect(value.f1);
+			}
+		}
+
+		KeyedProcessOperator<Integer, Tuple2<Integer, String>, String> operator =
+			new KeyedProcessOperator<>(new KeyQueryingProcessFunction());
+
+		try (
+			OneInputStreamOperatorTestHarness<Tuple2<Integer, String>, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , BasicTypeInfo.INT_TYPE_INFO)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5"), 12L));
+			testHarness.processElement(new StreamRecord<>(Tuple2.of(42, "42"), 13L));
+
+			ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+			expectedOutput.add(new StreamRecord<>("5", 12L));
+			expectedOutput.add(new StreamRecord<>("42", 13L));
+
+			TestHarnessUtil.assertOutputEquals(
+				"Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+		}
+	}
+
+	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
 		KeyedProcessOperator<Integer, Integer, String> operator =

http://git-wip-us.apache.org/repos/asf/flink/blob/cde504eb/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
index c3692d5..715bc9d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
@@ -70,6 +71,56 @@ public class CoBroadcastWithKeyedOperatorTest {
 					BasicTypeInfo.INT_TYPE_INFO
 			);
 
+	@Test
+	public void testKeyQuerying() throws Exception {
+
+		class KeyQueryingProcessFunction extends KeyedBroadcastProcessFunction<Integer, Tuple2<Integer, String>, String, String> {
+
+			@Override
+			public void processElement(
+				Tuple2<Integer, String> value,
+				ReadOnlyContext ctx,
+				Collector<String> out) throws Exception {
+				assertTrue("Did not get expected key.", ctx.getCurrentKey().equals(value.f0));
+
+				// we check that we receive this output, to ensure that the assert was actually checked
+				out.collect(value.f1);
+
+			}
+
+			@Override
+			public void processBroadcastElement(
+				String value,
+				Context ctx,
+				Collector<String> out) throws Exception {
+
+			}
+		}
+
+		CoBroadcastWithKeyedOperator<Integer, Tuple2<Integer, String>, String, String> operator =
+			new CoBroadcastWithKeyedOperator<>(new KeyQueryingProcessFunction(), Collections.emptyList());
+
+		try (
+			TwoInputStreamOperatorTestHarness<Tuple2<Integer, String>, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(operator, (in) -> in.f0 , null, BasicTypeInfo.INT_TYPE_INFO)) {
+
+			testHarness.setup();
+			testHarness.open();
+
+			testHarness.processElement1(new StreamRecord<>(Tuple2.of(5, "5"), 12L));
+			testHarness.processElement1(new StreamRecord<>(Tuple2.of(42, "42"), 13L));
+
+			ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+			expectedOutput.add(new StreamRecord<>("5", 12L));
+			expectedOutput.add(new StreamRecord<>("42", 13L));
+
+			TestHarnessUtil.assertOutputEquals(
+				"Output was not correct.",
+				expectedOutput,
+				testHarness.getOutput());
+		}
+	}
+
 	/** Test the iteration over the keyed state on the broadcast side. */
 	@Test
 	public void testAccessToKeyedStateIt() throws Exception {