You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2017/03/11 02:32:29 UTC

flink git commit: [FLINK-6023] fix process function doc examples

Repository: flink
Updated Branches:
  refs/heads/master 7ef068ccc -> 354a13edf


[FLINK-6023] fix process function doc examples

This closes #3510.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/354a13ed
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/354a13ed
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/354a13ed

Branch: refs/heads/master
Commit: 354a13edff6821c57418802048de52ad5ba3d557
Parents: 7ef068c
Author: Mauro Cortellazzi <ma...@radicalbit.io>
Authored: Fri Mar 10 15:12:55 2017 +0100
Committer: Kurt Young <ku...@apache.org>
Committed: Sat Mar 11 10:31:38 2017 +0800

----------------------------------------------------------------------
 docs/dev/stream/process_function.md | 56 ++++++++++++++++----------------
 1 file changed, 28 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/354a13ed/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index 1f93f68..25de9a4 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -96,7 +96,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
 import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
 import org.apache.flink.util.Collector;
@@ -123,7 +123,7 @@ public class CountWithTimestamp {
 /**
  * The implementation of the ProcessFunction that maintains the count and timeouts
  */
-public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
+public class CountWithTimeoutFunction extends ProcessFunction<Tuple2<String, String>, Tuple2<String, Long>> {
 
     /** The state that is maintained by this process function */
     private ValueState<CountWithTimestamp> state;
@@ -134,7 +134,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
     }
 
     @Override
-    public void processElement(Tuple2<String, Long> value, Context ctx, Collector<Tuple2<String, Long>> out)
+    public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, Long>> out)
             throws Exception {
 
         // retrieve the current count
@@ -154,7 +154,7 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
         state.update(current);
 
         // schedule the next timer 60 seconds from the current event time
-        ctx.timerService().registerEventTimeTimer(current.timestamp + 60000);
+        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
     }
 
     @Override
@@ -165,8 +165,8 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
         CountWithTimestamp result = state.value();
 
         // check if this is an outdated timer or the latest timer
-        if (timestamp == result.lastModified) {
-            // emit the state
+        if (timestamp == result.lastModified + 60000) {
+            // emit the state on timeout
             out.collect(new Tuple2<String, Long>(result.key, result.count));
         }
     }
@@ -176,43 +176,43 @@ public class CountWithTimeoutFunction extends RichProcessFunction<Tuple2<String,
 
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.ProcessFunction.Context;
-import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext;
-import org.apache.flink.util.Collector;
+import org.apache.flink.api.common.state.ValueState
+import org.apache.flink.api.common.state.ValueStateDescriptor
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.ProcessFunction.Context
+import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
+import org.apache.flink.util.Collector
 
 // the source data stream
-DataStream<Tuple2<String, String>> stream = ...;
+val stream: DataStream[Tuple2[String, String]] = ...
 
 // apply the process function onto a keyed stream
-DataStream<Tuple2<String, Long>> result = stream
-    .keyBy(0)
-    .process(new CountWithTimeoutFunction());
+val result: DataStream[Tuple2[String, Long]] = stream
+  .keyBy(0)
+  .process(new CountWithTimeoutFunction())
 
 /**
- * The data type stored in the state
- */
+  * The data type stored in the state
+  */
 case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
 
 /**
- * The implementation of the ProcessFunction that maintains the count and timeouts
- */
-class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long)] {
+  * The implementation of the ProcessFunction that maintains the count and timeouts
+  */
+class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
 
   /** The state that is maintained by this process function */
-  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext()
-      .getState(new ValueStateDescriptor<>("myState", clasOf[CountWithTimestamp]))
+  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
+    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
 
 
-  override def processElement(value: (String, Long), ctx: Context, out: Collector[(String, Long)]): Unit = {
+  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
     // initialize or retrieve/update the state
 
     val current: CountWithTimestamp = state.value match {
       case null =>
-        CountWithTimestamp(key, 1, ctx.timestamp)
-      case CountWithTimestamp(key, count, time) =>
+        CountWithTimestamp(value._1, 1, ctx.timestamp)
+      case CountWithTimestamp(key, count, lastModified) =>
         CountWithTimestamp(key, count + 1, ctx.timestamp)
     }
 
@@ -220,12 +220,12 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
     state.update(current)
 
     // schedule the next timer 60 seconds from the current event time
-    ctx.timerService.registerEventTimeTimer(current.timestamp + 60000)
+    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
   }
 
   override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
     state.value match {
-      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
+      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp + 60000) =>
         out.collect((key, count))
       case _ =>
     }