You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/11 02:32:29 UTC
flink git commit: [FLINK-6023] fix process function doc examples
Repository: flink
Updated Branches:
refs/heads/master 7ef068ccc -> 354a13edf
[FLINK-6023] fix process function doc examples
This closes #3510.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354a13ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354a13ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354a13ed
Branch: refs/heads/master
Commit: 354a13edff6821c57418802048de52ad5ba3d557
Parents: 7ef068c
Author: Mauro Cortellazzi <ma...@radicalbit.io>
Authored: Fri Mar 10 15:12:55 2017 +0100
Committer: Kurt Young <ku...@apache.org>
Committed: Sat Mar 11 10:31:38 2017 +0800
----------------------------------------------------------------------
docs/dev/stream/process_function.md | 56 ++++++++++++++++----------------
1 file changed, 28 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/354a13ed/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index 1f93f68..25de9a4 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -96,7 +96,7 @@ import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
import org.apache.flink.util.Collector;
@@ -123,7 +123,7 @@ public class CountWithTimestamp {
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
-public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
+public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
/** The state that is maintained by this process function */
private ValueState<CountWithTimestamp> state;
@@ -134,7 +134,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
}
@Override
- public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
+ public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
throws Exception {
// retrieve the current count
@@ -154,7 +154,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
state.update(current);
// schedule the next timer 60 seconds from the current event time
- ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
+ ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
}
@Override
@@ -165,8 +165,8 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
CountWithTimestamp result = state.value();
// check if this is an outdated timer or the latest timer
- if (timestamp == result.lastModified) {
- // emit the state
+ if (timestamp == result.lastModified + 60000) {
+ // emit the state on timeout
out.collect(new Tuple2<String, Long>(result.key, result.count));
}
}
@@ -176,43 +176,43 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
<div data-lang="scala" markdown="1">
{% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
// the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
// apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
- .keyBy(0)
- .process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+ .keyBy(0)
+ .process(new CountWithTimeoutFunction())
/**
- * The data type stored in the state
- */
+ * The data type stored in the state
+ */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
- * The implementation of the ProcessFunction that maintains the count and timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
+ * The implementation of the ProcessFunction that maintains the count and timeouts
+ */
+class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** The state that is maintained by this process function */
- lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
- .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
+ lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+ .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
- override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
+ override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
- CountWithTimestamp(key, 1, ctx.timestamp)
- case CountWithTimestamp(key, count, time) =>
+ CountWithTimestamp(value._1, 1, ctx.timestamp)
+ case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
@@ -220,12 +220,12 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
state.update(current)
// schedule the next timer 60 seconds from the current event time
- ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
+ ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
- case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
+ case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp + 60000) =>
out.collect((key, count))
case _ =>
}