You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2017/06/13 05:17:34 UTC

[15/15] flink git commit: [FLINK-6848] [doc] Update managed state docs to include Scala snippets

[FLINK-6848] [doc] Update managed state docs to include Scala snippets

Add an example of how to work with managed state in Scala.

This closes #4072.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23c82e3c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23c82e3c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23c82e3c

Branch: refs/heads/master
Commit: 23c82e3cc9d632c17850f3c7d2b3a1ab0a0cd5cb
Parents: 8b26460
Author: Fokko Driesprong <fo...@godatadriven.com>
Authored: Sun Jun 4 16:08:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai <tz...@apache.org>
Committed: Tue Jun 13 07:14:38 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/state.md | 212 ++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 203 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23c82e3c/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 0025fae..65d0d75 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -115,7 +115,7 @@ of elements that are added to the state. The interface is the same as for `ListS
 added using `add(T)` are folded into an aggregate using a specified `FoldFunction`.
 
 * `MapState<UK, UV>`: This keeps a list of mappings. You can put key-value pairs into the state and
-retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or 
+retrieve an `Iterable` over all currently stored mappings. Mappings are added using `put(UK, UV)` or
 `putAll(Map<UK, UV>)`. The value associated with a user key can be retrieved using `get(UK)`. The iterable
 views for mappings, keys and values can be retrieved using `entries()`, `keys()` and `values()` respectively.
 
@@ -152,6 +152,8 @@ is available in a `RichFunction` has these methods for accessing state:
 
 This is an example `FlatMapFunction` that shows how all of the parts fit together:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
@@ -201,6 +203,66 @@ env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2
 
 // the printed output will be (1,4) and (1,5)
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
+
+  private var sum: ValueState[(Long, Long)] = _
+
+  override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
+
+    // access the state value
+    val tmpCurrentSum = sum.value
+
+    // If it hasn't been used before, it will be null
+    val currentSum = if (tmpCurrentSum != null) {
+      tmpCurrentSum
+    } else {
+      (0L, 0L)
+    }
+
+    // update the count
+    val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
+
+    // update the state
+    sum.update(newSum)
+
+    // if the count reaches 2, emit the average and clear the state
+    if (newSum._1 >= 2) {
+      out.collect((input._1, newSum._2 / newSum._1))
+      sum.clear()
+    }
+  }
+
+  override def open(parameters: Configuration): Unit = {
+    sum = getRuntimeContext.getState(
+      new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
+    )
+  }
+}
+
+
+object ExampleCountWindowAverage extends App {
+  val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+  env.fromCollection(List(
+    (1L, 3L),
+    (1L, 5L),
+    (1L, 7L),
+    (1L, 4L),
+    (1L, 2L)
+  )).keyBy(_._1)
+    .flatMap(new CountWindowAverage())
+    .print()
+  // the printed output will be (1,4) and (1,5)
+
+  env.execute("ExampleManagedState")
+}
+{% endhighlight %}
+</div>
+</div>
 
 This example implements a poor man's counting window. We key the tuples by the first field
 (in the example all have the same key `1`). The function stores the count and a running sum in
@@ -268,6 +330,8 @@ Below is an example of a stateful `SinkFunction` that uses `CheckpointedFunction
 to buffer elements before sending them to the outside world. It demonstrates
 the basic even-split redistribution list state:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public class BufferingSink
         implements SinkFunction<Tuple2<String, Integer>>,
@@ -311,7 +375,7 @@ public class BufferingSink
                 "buffered-elements",
                 TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
                 Tuple2.of(0L, 0L));
-                
+
         checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 
         if (context.isRestored()) {
@@ -328,6 +392,59 @@ public class BufferingSink
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class BufferingSink(threshold: Int = 0)
+  extends SinkFunction[(String, Int)]
+    with CheckpointedFunction
+    with CheckpointedRestoring[List[(String, Int)]] {
+
+  @transient
+  private var checkpointedState: ListState[(String, Int)] = null
+
+  private val bufferedElements = ListBuffer[(String, Int)]()
+
+  override def invoke(value: (String, Int)): Unit = {
+    bufferedElements += value
+    if (bufferedElements.size == threshold) {
+      for (element <- bufferedElements) {
+        // send it to the sink
+      }
+      bufferedElements.clear()
+    }
+  }
+
+  override def snapshotState(context: FunctionSnapshotContext): Unit = {
+    checkpointedState.clear()
+    for (element <- bufferedElements) {
+      checkpointedState.add(element)
+    }
+  }
+
+  override def initializeState(context: FunctionInitializationContext): Unit = {
+    val descriptor = new ListStateDescriptor[(String, Int)](
+      "buffered-elements",
+      TypeInformation.of(new TypeHint[(String, Int)]() {})
+    )
+
+    checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+    if(context.isRestored) {
+      for(element <- checkpointedState.get()) {
+        bufferedElements += element
+      }
+    }
+  }
+
+  override def restoreState(state: List[(String, Int)]): Unit = {
+    bufferedElements ++= state
+  }
+}
+{% endhighlight %}
+</div>
+</div>
 
 The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize
 the non-keyed state "containers". These are a container of type `ListState` where the non-keyed state objects
@@ -337,16 +454,32 @@ Note how the state is initialized, similar to keyed state,
 with a `StateDescriptor` that contains the state name and information
 about the type of the value that the state holds:
 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "buffered-elements",
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
-        Tuple2.of(0L, 0L));
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
 
 checkpointedState = context.getOperatorStateStore().getListState(descriptor);
 {% endhighlight %}
 
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+val descriptor = new ListStateDescriptor[(String, Long)](
+    "buffered-elements",
+    TypeInformation.of(new TypeHint[(String, Long)]() {})
+)
+
+checkpointedState = context.getOperatorStateStore.getListState(descriptor)
+
+{% endhighlight %}
+</div>
+</div>
 The naming convention of the state access methods contain its redistribution
 pattern followed by its state structure. For example, to use list state with the
 union redistribution scheme on restore, access the state by using `getUnionListState(descriptor)`.
@@ -385,6 +518,8 @@ Stateful sources require a bit more care as opposed to other operators.
 In order to make the updates to the state and output collection atomic (required for exactly-once semantics
 on failure/recovery), the user is required to get a lock from the source's context.
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
 public static class CounterSource
         extends RichParallelSourceFunction<Long>
@@ -426,6 +561,46 @@ public static class CounterSource
     }
 }
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CounterSource
+       extends RichParallelSourceFunction[Long]
+       with ListCheckpointed[Long] {
+
+  @volatile
+  private var isRunning = true
+
+  private var offset = 0L
+
+  override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
+    val lock = ctx.getCheckpointLock
+
+    while (isRunning) {
+      // output and state update are atomic
+      lock.synchronized({
+        ctx.collect(offset)
+
+        offset += 1
+      })
+    }
+  }
+
+  override def cancel(): Unit = isRunning = false
+
+  override def restoreState(state: util.List[Long]): Unit =
+    for (s <- state) {
+      offset = s
+    }
+
+  override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
+    Collections.singletonList(offset)
+
+}
+{% endhighlight %}
+</div>
+</div>
 
 Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `org.apache.flink.runtime.state.CheckpointListener` interface.
 
@@ -433,7 +608,7 @@ Some operators might need the information when a checkpoint is fully acknowledge
 
 This section is targeted as a guideline for users who require the use of custom serialization for their state, covering how
 to provide a custom serializer and how to handle upgrades to the serializer for compatibility. If you're simply using
-Flink's own serializers, this section is irrelevant and can be skipped. 
+Flink's own serializers, this section is irrelevant and can be skipped.
 
 ### Using custom serializers
 
@@ -444,14 +619,33 @@ to specify the state's name, as well as information about the type of the state.
 It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
 simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:
 
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
 {% highlight java %}
+public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};
+
 ListStateDescriptor<Tuple2<String, Integer>> descriptor =
     new ListStateDescriptor<>(
         "state-name",
-        new TypeSerializer<> {...});
+        new CustomTypeSerializer());
 
 checkpointedState = getRuntimeContext().getListState(descriptor);
 {% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}
+
+val descriptor = new ListStateDescriptor[(String, Integer)](
+    "state-name",
+    new CustomTypeSerializer)
+)
+
+checkpointedState = getRuntimeContext.getListState(descriptor);
+{% endhighlight %}
+</div>
+</div>
 
 Note that Flink writes state serializers along with the state as metadata. In certain cases on restore (see following
 subsections), the written serializer needs to be deserialized and used. Therefore, it is recommended to avoid using
@@ -542,7 +736,7 @@ The above cases can be translated to code by returning one of the following from
 
   * **`CompatibilityResult.compatible()`**: This acknowledges that the new serializer is compatible, or has been reconfigured to
     be compatible, and Flink can proceed with the job with the serializer as is.
-    
+
   * **`CompatibilityResult.requiresMigration()`**: This acknowledges that the serializer is incompatible, or cannot be
     reconfigured to be compatible, and requires a state migration before the new serializer can be used. State migration
     is performed by using the previous serializer to read the restored state bytes to objects, and then serialized again
@@ -551,7 +745,7 @@ The above cases can be translated to code by returning one of the following from
   * **`CompatibilityResult.requiresMigration(TypeDeserializer deserializer)`**: This acknowledgement has equivalent semantics
     to `CompatibilityResult.requiresMigration()`, but in the case that the previous serializer cannot be found or loaded
     to read the restored state bytes for the migration, a provided `TypeDeserializer` can be used as a fallback resort.
-  
+
 <span class="label label-danger">Attention</span> Currently, as of Flink 1.3, if the result of the compatibility check
 acknowledges that state migration needs to be performed, the job simply fails to restore from the checkpoint as state
 migration is currently not available. The ability to migrate state will be introduced in future releases.
@@ -560,7 +754,7 @@ migration is currently not available. The ability to migrate state will be intro
 
 Since `TypeSerializer`s and `TypeSerializerConfigSnapshot`s are written as part of checkpoints along with the state
 values, the availability of the classes within the classpath may affect restore behaviour.
- 
+
 `TypeSerializer`s are directly written into checkpoints using Java Object Serialization. In the case that the new
 serializer acknowledges that it is incompatible and requires state migration, it will be required to be present to be
 able to read the restored state bytes. Therefore, if the original serializer class no longer exists or has been modified