You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/09/25 13:50:30 UTC
[2/3] flink git commit: [FLINK-7635] Support side output in
ProcessWindowFunction
[FLINK-7635] Support side output in ProcessWindowFunction
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c151a537
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c151a537
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c151a537
Branch: refs/heads/master
Commit: c151a537c205d20db598354ba5afc4f228c746c3
Parents: 68a99d7
Author: Bowen Li <bo...@gmail.com>
Authored: Tue Sep 19 23:35:34 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 25 12:06:51 2017 +0200
----------------------------------------------------------------------
.../InternalProcessApplyWindowContext.java | 6 +++
.../windowing/ProcessWindowFunction.java | 9 ++++
.../api/operators/ProcessOperator.java | 5 +--
.../operators/windowing/WindowOperator.java | 7 +++
.../functions/InternalProcessWindowContext.java | 6 +++
.../functions/InternalWindowFunction.java | 3 ++
.../scala/function/ProcessWindowFunction.scala | 9 ++--
.../ScalaProcessWindowFunctionWrapper.scala | 5 +++
.../streaming/api/scala/SideOutputITCase.scala | 46 ++++++++++++++++++++
.../streaming/runtime/SideOutputITCase.java | 35 +++++++++++++++
10 files changed, 124 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
index 47a2e3a..3d52e35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/InternalProcessApplyWindowContext.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.functions.windowing;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
/**
* Internal reusable context wrapper.
@@ -64,4 +65,9 @@ public class InternalProcessApplyWindowContext<IN, OUT, KEY, W extends Window>
public KeyedStateStore globalState() {
return context.globalState();
}
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ context.output(outputTag, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
index 506b610..08ed49c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* Base abstract class for functions that are evaluated over keyed (grouped) windows using a context
@@ -85,5 +86,13 @@ public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> exte
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
+
+ /**
+ * Emits a record to the side output identified by the {@link OutputTag}.
+ *
+ * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+ * @param value The record to emit.
+ */
+ public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 5c9e8fc..b353a63 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -73,10 +73,7 @@ public class ProcessOperator<IN, OUT>
this.currentWatermark = mark.getTimestamp();
}
- private class ContextImpl
- extends ProcessFunction<IN, OUT>.Context
- implements TimerService {
-
+ private class ContextImpl extends ProcessFunction<IN, OUT>.Context implements TimerService {
private StreamRecord<IN> element;
private final ProcessingTimeService processingTimeService;
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b14739f..fd90e65 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -774,6 +774,13 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
public KeyedStateStore globalState() {
return WindowOperator.this.getKeyedStateStore();
}
+
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ if (outputTag == null) {
+ throw new IllegalArgumentException("OutputTag must not be null.");
+ }
+ output.collect(outputTag, new StreamRecord<>(value, window.maxTimestamp()));
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
index 9505332..4d5d1c6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalProcessWindowContext.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.apache.flink.util.OutputTag;
/**
* Internal reusable context wrapper.
@@ -66,4 +67,9 @@ public class InternalProcessWindowContext<IN, OUT, KEY, W extends Window>
public KeyedStateStore globalState() {
return internalContext.globalState();
}
+
+ @Override
+ public <X> void output(OutputTag<X> outputTag, X value) {
+ internalContext.output(outputTag, value);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
index 0999565..c304d7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/functions/InternalWindowFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
/**
* Internal interface for functions that are evaluated over keyed (grouped) windows.
@@ -63,5 +64,7 @@ public interface InternalWindowFunction<IN, OUT, KEY, W extends Window> extends
KeyedStateStore windowState();
KeyedStateStore globalState();
+
+ <X> void output(OutputTag<X> outputTag, X value);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
index d2075db..7ae51ea 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/ProcessWindowFunction.scala
@@ -18,11 +18,10 @@
package org.apache.flink.streaming.api.scala.function
-import java.io.Serializable
-
import org.apache.flink.annotation.PublicEvolving
import org.apache.flink.api.common.functions.AbstractRichFunction
import org.apache.flink.api.common.state.KeyedStateStore
+import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.windowing.windows.Window
import org.apache.flink.util.Collector
@@ -88,6 +87,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window]
* State accessor for per-key global state.
*/
def globalState: KeyedStateStore
- }
+ /**
+ * Emits a record to the side output identified by the [[OutputTag]].
+ */
+ def output[X](outputTag: OutputTag[X], value: X);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
index bc4b7dd..98b050c 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/function/util/ScalaProcessWindowFunctionWrapper.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.windowing.{ProcessWindowFunction => JProcessWindowFunction}
import org.apache.flink.streaming.api.functions.windowing.{ProcessAllWindowFunction => JProcessAllWindowFunction}
+import org.apache.flink.streaming.api.scala.OutputTag
import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction => ScalaProcessWindowFunction}
import org.apache.flink.streaming.api.scala.function.{ProcessAllWindowFunction => ScalaProcessAllWindowFunction}
import org.apache.flink.streaming.api.windowing.windows.Window
@@ -56,6 +57,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def windowState = context.windowState()
override def globalState = context.globalState()
+
+ override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
}
func.process(key, ctx, elements.asScala, out)
}
@@ -71,6 +74,8 @@ final class ScalaProcessWindowFunctionWrapper[IN, OUT, KEY, W <: Window](
override def windowState = context.windowState()
override def globalState = context.globalState()
+
+ override def output[X](outputTag: OutputTag[X], value: X) = context.output(outputTag, value)
}
func.clear(ctx)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
index 29bcbcf..f09323c 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/SideOutputITCase.scala
@@ -234,6 +234,52 @@ class SideOutputITCase extends StreamingMultipleProgramsTestBase {
assertEquals(util.Arrays.asList(("3", 3), ("4", 4)), lateResultSink.getResult)
}
+ /**
+ * Test ProcessWindowFunction side output.
+ */
+ @Test
+ def testProcessWindowFunctionSideOutput() {
+ val resultSink = new TestListResultSink[String]
+ val sideOutputResultSink = new TestListResultSink[String]
+
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setParallelism(1)
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+
+ val dataStream = env.fromElements(("1", 1), ("2", 2), ("5", 5), ("3", 3), ("4", 4))
+
+
+ val sideOutputTag = OutputTag[String]("side")
+
+ val windowOperator = dataStream
+ .assignTimestampsAndWatermarks(new TestAssigner)
+ .keyBy(i => i._1)
+ .window(TumblingEventTimeWindows.of(Time.milliseconds(1)))
+ .process(new ProcessWindowFunction[(String, Int), String, String, TimeWindow] {
+ override def process(
+ key: String,
+ context: Context,
+ elements: Iterable[(String, Int)],
+ out: Collector[String]): Unit = {
+ for (in <- elements) {
+ out.collect(in._1)
+ context.output(sideOutputTag, "sideout-" + in._1)
+ }
+ }
+ })
+
+ windowOperator
+ .getSideOutput(sideOutputTag)
+ .addSink(sideOutputResultSink)
+
+ windowOperator.addSink(resultSink)
+
+ env.execute()
+
+ assertEquals(util.Arrays.asList("1", "2", "5"), resultSink.getResult)
+ assertEquals(util.Arrays.asList("sideout-1", "sideout-2", "sideout-5"),
+ sideOutputResultSink.getResult)
+ }
}
class TestAssigner extends AssignerWithPunctuatedWatermarks[(String, Int)] {
http://git-wip-us.apache.org/repos/asf/flink/blob/c151a537/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
index f73bf42..f74f8ff 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -547,4 +548,38 @@ public class SideOutputITCase extends StreamingMultipleProgramsTestBase implemen
assertEquals(Collections.singletonList(3), lateResultSink.getSortedResult());
}
+ @Test
+ public void testProcessdWindowFunctionSideOutput() throws Exception {
+ TestListResultSink<Integer> resultSink = new TestListResultSink<>();
+ TestListResultSink<String> sideOutputResultSink = new TestListResultSink<>();
+
+ StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
+ see.setParallelism(3);
+ see.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+
+ DataStream<Integer> dataStream = see.fromCollection(elements);
+
+ OutputTag<String> sideOutputTag = new OutputTag<String>("side"){};
+
+ SingleOutputStreamOperator<Integer> windowOperator = dataStream
+ .assignTimestampsAndWatermarks(new TestWatermarkAssigner())
+ .keyBy(new TestKeySelector())
+ .timeWindow(Time.milliseconds(1), Time.milliseconds(1))
+ .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
+ out.collect(integer);
+ context.output(sideOutputTag, "sideout-" + String.valueOf(integer));
+ }
+ });
+
+ windowOperator.getSideOutput(sideOutputTag).addSink(sideOutputResultSink);
+ windowOperator.addSink(resultSink);
+ see.execute();
+
+ assertEquals(Arrays.asList("sideout-1", "sideout-2", "sideout-5"), sideOutputResultSink.getSortedResult());
+ assertEquals(Arrays.asList(1, 2, 5), resultSink.getSortedResult());
+ }
}