You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by uc...@apache.org on 2017/02/06 16:10:45 UTC

[1/2] flink git commit: [FLINK-5618] [docs] Add queryable state (user) documentation

Repository: flink
Updated Branches:
  refs/heads/release-1.2 39e6049c5 -> a9a500479


[FLINK-5618] [docs] Add queryable state (user) documentation

This closes #3275.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9a50047
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9a50047
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9a50047

Branch: refs/heads/release-1.2
Commit: a9a50047905a749eacea41e7cb312907f6331809
Parents: f4fe654
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 6 16:19:06 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Feb 6 17:10:38 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/queryable_state.md | 235 +++++++++++++++++++++++++++++++-
 1 file changed, 233 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a9a50047/docs/dev/stream/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/queryable_state.md b/docs/dev/stream/queryable_state.md
index 5f47df8..6c34536 100644
--- a/docs/dev/stream/queryable_state.md
+++ b/docs/dev/stream/queryable_state.md
@@ -26,7 +26,238 @@ under the License.
 {:toc}
 
 <div class="alert alert-warning">
-  <strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is likely that there will be breaking API changes on the client side in the upcoming Flink versions.
+  <strong>Note:</strong> The client APIs for queryable state are currently in an evolving state and
+  there are <strong>no guarantees</strong> made about stability of the provided interfaces. It is
+  likely that there will be breaking API changes on the client side in the upcoming Flink versions.
 </div>
 
-Coming up...
\ No newline at end of file
+In a nutshell, this feature allows users to query Flink's managed partitioned state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state)) from outside of
+Flink. For some scenarios, queryable state thus eliminates the need for distributed
+operations/transactions with external systems such as key-value stores which are often the
+bottleneck in practice.
+
+<div class="alert alert-danger">
+  <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather
+  than synchronizing with the operator and potentially blocking its operation. Since any state
+  backend using Java heap space, e.g. <code>MemoryStateBackend</code> or
+  <code>FsStateBackend</code>, does not work with copies when retrieving values but instead the
+  references to the stored values, read-modify-write patterns are unsafe and may cause the
+  queryable state server to fail due to concurrent modifications.
+  The <code>RocksDBStateBackend</code> is safe from these issues.
+</div>
+
+## Making State Queryable
+
+In order to make state queryable, first, the queryable state server needs to be enabled globally
+by setting the `query.server.enable` configuration parameter to `true` (current default).
+Then, appropriate state needs to be made queryable by either
+* a convenience `QueryableStateStream` which behaves like a sink and offers incoming values as
+queryable state, or
+* using `StateDescriptor#setQueryable(String queryableStateName)` for making keyed state of an
+operator queryable.
+
+The following sections explain the use of these two.
+
+### Queryable State Stream
+
+A `KeyedStream` may offer its values as queryable state by using the following methods:
+
+{% highlight java %}
+// ValueState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    ValueStateDescriptor stateDescriptor)
+
+// Shortcut for explicit ValueStateDescriptor variant
+QueryableStateStream asQueryableState(String queryableStateName)
+
+// FoldingState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    FoldingStateDescriptor stateDescriptor)
+
+// ReducingState
+QueryableStateStream asQueryableState(
+    String queryableStateName,
+    ReducingStateDescriptor stateDescriptor)
+{% endhighlight %}
+
+
+<div class="alert alert-info">
+  <strong>Note:</strong> There is no queryable list state sink as it would result in an ever-growing
+  list which may not be cleaned up and thus will eventually consume too much memory.
+</div>
+
+A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and
+currently only holds the name as well as the value and key serializer for the queryable state
+stream. It is comparable to a sink, after which you cannot do further transformations.
+
+Internally, the `QueryableStateStream` gets translated to an operator, which uses all incoming
+records to update the queryable state instance.
+In a program like the following, all records of the keyed stream will be used to update the state
+instance, i.e. either via `ValueState#update(value)` or `AppendingState#add(value)` depending on
+the chosen state variant:
+{% highlight java %}
+stream.keyBy(0).asQueryableState("query-name")
+{% endhighlight %}
+This acts like the Scala API's `flatMapWithState`.
+
+### Managed Keyed State
+
+Managed keyed state of an operator
+(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state))
+can be made queryable by setting the appropriate state descriptor queryable via
+`StateDescriptor#setQueryable(String queryableStateName)` as in the example below.
+{% highlight java %}
+ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+        new ValueStateDescriptor<>(
+                "average", // the state name
+                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
+                Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+descriptor.setQueryable("query-name"); // queryable state name
+{% endhighlight %}
+<div class="alert alert-info">
+  <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only
+  used for queries. It does not have to be identical to the state's own name.
+</div>
+
+
+## Querying State
+
+The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that
+serve the state internally. It needs to be set up with a valid `JobManager` address and port and is
+created as follows:
+
+{% highlight java %}
+final Configuration config = new Configuration();
+config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+
+QueryableStateClient client = new QueryableStateClient(config);
+{% endhighlight %}
+
+The query method is this:
+
+{% highlight java %}
+Future<byte[]> getKvState(
+    JobID jobID,
+    String queryableStateName,
+    int keyHashCode,
+    byte[] serializedKeyAndNamespace)
+{% endhighlight %}
+
+A call to this method returns a `Future` eventually holding the serialized state value for the
+queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The
+`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the
+`serializedKeyAndNamespace` is the serialized key and namespace.
+<div class="alert alert-info">
+  <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs
+  to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free
+  resources.
+</div>
+
+The current implementation is still pretty low-level in the sense that it only works with
+serialized data both for providing the key/namespace and the returned results. It is the
+responsibility of the user (or some follow-up utilities) to set up the serializers for this. The
+nice thing about this is that the query services don't have to get into the business of worrying
+about any class loading issues etc.
+
+There are some serialization utils for key/namespace and value serialization included in
+`KvStateRequestSerializer`.
+
+## Example
+
+The following example extends the `CountWindowAverage` example from
+(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state.html#using-managed-keyed-state))
+by making it queryable and showing how to query this value:
+
+{% highlight java %}
+public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
+
+    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
+
+    @Override
+    public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
+        Tuple2<Long, Long> currentSum = sum.value();
+        currentSum.f0 += 1;
+        currentSum.f1 += input.f1;
+        sum.update(currentSum);
+
+        if (currentSum.f0 >= 2) {
+            out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
+            sum.clear();
+        }
+    }
+
+    @Override
+    public void open(Configuration config) {
+        ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+                new ValueStateDescriptor<>(
+                        "average", // the state name
+                        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
+                        Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
+        descriptor.setQueryable("query-name");
+        sum = getRuntimeContext().getState(descriptor);
+    }
+}
+{% endhighlight %}
+
+Once used on a job, retrieve the job ID and query any key's current state of this operator via
+(for any `Long key`):
+
+{% highlight java %}
+final Configuration config = new Configuration();
+config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
+config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+
+QueryableStateClient client = new QueryableStateClient(config);
+
+final TypeSerializer<Long> keySerializer =
+        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(null);
+final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
+        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(null);
+
+final byte[] serializedKey =
+        KvStateRequestSerializer.serializeKeyAndNamespace(
+                key, keySerializer,
+                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
+
+Future<byte[]> serializedResult =
+        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
+
+// now wait for the result and return it
+final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
+byte[] serializedValue = Await.result(serializedResult, duration);
+Tuple2<Long, Long> value =
+        KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
+{% endhighlight %}
+
+
+## Configuration
+
+The following configuration parameters influence the queryable state server's and client's
+behaviour. They are defined in `QueryableStateOptions`.
+
+### Server
+* `query.server.enable`: flag to indicate whether to start the queryable state server
+* `query.server.port`: port to bind internal `KvStateServer` to (0 => pick random available port)
+* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots)
+* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots).
+
+### Client (`QueryableStateClient`)
+* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores)
+* `query.client.lookup.num-retries`: number of retries on location lookup failures
+* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis)
+
+## Limitations
+
+* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register
+queryable state on startup and unregister it on dispose. In future versions, it is desirable to
+decouple this in order to allow queries after a task finishes and to speed up recovery via state
+replication.
+* Notifications about available KvState happen via a simple tell. This should be improved to be
+more robust with asks and acknowledgements in future.
+* The server and client keep track of statistics for queries. These are currently disabled by
+default as they would not be exposed anywhere. As soon as there is better support to publish these
+numbers via the Metrics system, we should enable the stats.


[2/2] flink git commit: [docs][state] add missing Java syntax highlighting to documentation

Posted by uc...@apache.org.
[docs][state] add missing Java syntax highlighting to documentation


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4fe654e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4fe654e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4fe654e

Branch: refs/heads/release-1.2
Commit: f4fe654e7e1457a0f9f39f128545438baf46ffbc
Parents: 39e6049
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Mon Feb 6 11:03:45 2017 +0100
Committer: Ufuk Celebi <uc...@apache.org>
Committed: Mon Feb 6 17:10:38 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state.md | 88 +++++++++++++++++++++++--------------------
 1 file changed, 48 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4fe654e/docs/dev/stream/state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state.md b/docs/dev/stream/state.md
index 124ce68..4d1cfab 100644
--- a/docs/dev/stream/state.md
+++ b/docs/dev/stream/state.md
@@ -230,9 +230,11 @@ while `(test2, 2)` will go to task 1.
 
 The `ListCheckpointed` interface requires the implementation of two methods:
 
-    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
+{% highlight java %}
+List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
 
-    void restoreState(List<T> state) throws Exception;
+void restoreState(List<T> state) throws Exception;
+{% endhighlight %}
 
 On `snapshotState()` the operator should return a list of objects to checkpoint and
 `restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always
@@ -242,9 +244,11 @@ return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`.
 
 The `CheckpointedFunction` interface also requires the implementation of two methods:
 
-    void snapshotState(FunctionSnapshotContext context) throws Exception;
+{% highlight java %}
+void snapshotState(FunctionSnapshotContext context) throws Exception;
 
-    void initializeState(FunctionInitializationContext context) throws Exception;
+void initializeState(FunctionInitializationContext context) throws Exception;
+{% endhighlight %}
 
 Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized
 or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not
@@ -253,57 +257,61 @@ only the place where different types of state are initialized, but also where st
 This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that
 uses state to buffer elements before sending them to the outside world:
 
-    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
-            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
+{% highlight java %}
+public class BufferingSink
+        implements SinkFunction<Tuple2<String, Integer>>,
+                   CheckpointedFunction,
+                   CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
 
-        private final int threshold;
+    private final int threshold;
 
-        private transient ListState<Tuple2<String, Integer>> checkpointedState;
+    private transient ListState<Tuple2<String, Integer>> checkpointedState;
 
-        private List<Tuple2<String, Integer>> bufferedElements;
+    private List<Tuple2<String, Integer>> bufferedElements;
 
-        public BufferingSink(int threshold) {
-            this.threshold = threshold;
-            this.bufferedElements = new ArrayList<>();
-        }
+    public BufferingSink(int threshold) {
+        this.threshold = threshold;
+        this.bufferedElements = new ArrayList<>();
+    }
 
-        @Override
-        public void invoke(Tuple2<String, Integer> value) throws Exception {
-            bufferedElements.add(value);
-            if (bufferedElements.size() == threshold) {
-                for (Tuple2<String, Integer> element: bufferedElements) {
-                    // send it to the sink
-                }
-                bufferedElements.clear();
+    @Override
+    public void invoke(Tuple2<String, Integer> value) throws Exception {
+        bufferedElements.add(value);
+        if (bufferedElements.size() == threshold) {
+            for (Tuple2<String, Integer> element: bufferedElements) {
+                // send it to the sink
             }
+            bufferedElements.clear();
         }
+    }
 
-        @Override
-        public void snapshotState(FunctionSnapshotContext context) throws Exception {
-            checkpointedState.clear();
-            for (Tuple2<String, Integer> element : bufferedElements) {
-                checkpointedState.add(element);
-            }
+    @Override
+    public void snapshotState(FunctionSnapshotContext context) throws Exception {
+        checkpointedState.clear();
+        for (Tuple2<String, Integer> element : bufferedElements) {
+            checkpointedState.add(element);
         }
+    }
 
-        @Override
-        public void initializeState(FunctionInitializationContext context) throws Exception {
-            checkpointedState = context.getOperatorStateStore().
-                getSerializableListState("buffered-elements");
+    @Override
+    public void initializeState(FunctionInitializationContext context) throws Exception {
+        checkpointedState = context.getOperatorStateStore().
+            getSerializableListState("buffered-elements");
 
-            if (context.isRestored()) {
-                for (Tuple2<String, Integer> element : checkpointedState.get()) {
-                    bufferedElements.add(element);
-                }
+        if (context.isRestored()) {
+            for (Tuple2<String, Integer> element : checkpointedState.get()) {
+                bufferedElements.add(element);
             }
         }
+    }
 
-        @Override
-        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
-            // this is from the CheckpointedRestoring interface.
-            this.bufferedElements.addAll(state);
-        }
+    @Override
+    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
+        // this is from the CheckpointedRestoring interface.
+        this.bufferedElements.addAll(state);
     }
+}
+{% endhighlight %}
 
 
 The `initializeState` method takes as argument a `FunctionInitializationContext`. This is used to initialize