You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/07 13:14:09 UTC

[2/2] flink git commit: [FLINK-7822][QS][doc] Update Queryable State docs.

[FLINK-7822][QS][doc] Update Queryable State docs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81999545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81999545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81999545

Branch: refs/heads/release-1.4
Commit: 819995454611be6a85e2933318d053b2c25a18f7
Parents: aab36f9
Author: kkloudas <kk...@gmail.com>
Authored: Mon Nov 6 17:21:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Nov 7 14:12:59 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/state/queryable_state.md | 265 ++++++++++++++------------
 1 file changed, 144 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81999545/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md
index 8012f67..af646df 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -32,38 +32,68 @@ under the License.
   likely that there will be breaking API changes on the client side in the upcoming Flink versions.
 </div>
 
-In a nutshell, this feature allows users to query Flink's managed partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for distributed
-operations/transactions with external systems such as key-value stores which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and 
+allows the user to query a job's state from outside Flink. For some scenarios, queryable state 
+eliminates the need for distributed operations/transactions with external systems such as key-value 
+stores which are often the bottleneck in practice. In addition, this feature may be particularly 
+useful for debugging purposes.
 
 <div class="alert alert-warning">
-  <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather
-  than synchronizing with the operator and potentially blocking its operation. Since any state
-  backend using Java heap space, e.g. MemoryStateBackend or
-  FsStateBackend, does not work with copies when retrieving values but instead directly
-  references the stored values, read-modify-write patterns are unsafe and may cause the
-  queryable state server to fail due to concurrent modifications.
-  The RocksDBStateBackend is safe from these issues.
+  <strong>Attention:</strong> When querying a state object, that object is accessed from a concurrent 
+  thread without any synchronization or copying. This is a design choice, as any of the above would lead
+  to increased job latency, which we wanted to avoid. Since any state backend using Java heap space, 
+  <i>e.g.</i> <code>MemoryStateBackend</code> or <code>FsStateBackend</code>, does not work 
+  with copies when retrieving values but instead directly references the stored values, read-modify-write 
+  patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications.
+  The <code>RocksDBStateBackend</code> is safe from these issues.
 </div>
 
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it.
+The Queryable State feature consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries, 
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible 
+ for receiving the client's queries, fetching the requested state from the responsible Task Manager on his behalf, and 
+ returning it to the client, and 
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state.
+ 
+In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific 
+key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in 
+*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is 
+responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will 
+then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the 
+`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar` 
+from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"), 
+to the `lib/` folder. Otherwise, the queryable state feature is not enabled. 
+
+To verify that your cluster is running with queryable state enabled, check the logs of any 
+task manager for the line: `"Started the Queryable State Proxy Server @ ..."`.
+
 ## Making State Queryable
 
-In order to make state queryable, the queryable state server first needs to be enabled globally
-by setting the `query.server.enable` configuration parameter to `true` (current default).
-Then appropriate state needs to be made queryable by using either
+Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to 
+be visible to the outside world, it needs to be explicitly made queryable by using:
 
-* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as
-queryable state, or
-* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an
-operator queryable.
+* either a `QueryableStateStream`, a convenience object which acts as a sink and offers its incoming values as queryable
+state, or
+* the `stateDescriptor.setQueryable(String queryableStateName)` method, which makes the keyed state represented by the
+ state descriptor, queryable.
 
 The following sections explain the use of these two approaches.
 
 ### Queryable State Stream
 
-A `KeyedStream` may offer its values as queryable state by using the following methods:
+Calling `.asQueryableState(stateName, stateDescriptor)` on a `KeyedStream` returns a `QueryableStateStream` which offers
+its values as queryable state. Depending on the type of state, there are the following variants of the `asQueryableState()`
+method:
 
 {% highlight java %}
 // ValueState
@@ -91,18 +121,16 @@ QueryableStateStream asQueryableState(
   list which may not be cleaned up and thus will eventually consume too much memory.
 </div>
 
-A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and
-currently only holds the name as well as the value and key serializer for the queryable state
-stream. It is comparable to a sink, and cannot be followed by further transformations.
+The returned `QueryableStateStream` can be seen as a sink and **cannot** be further transformed. Internally, a 
+`QueryableStateStream` gets translated to an operator which uses all incoming records to update the queryable state 
+instance. The updating logic is implied by the type of the `StateDescriptor` provided in the `asQueryableState` call. 
+In a program like the following, all records of the keyed stream will be used to update the state instance via the 
+`ValueState.update(value)`:
 
-Internally a `QueryableStateStream` gets translated to an operator which uses all incoming
-records to update the queryable state instance.
-In a program like the following, all records of the keyed stream will be used to update the state
-instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on
-the chosen state variant:
 {% highlight java %}
 stream.keyBy(0).asQueryableState("query-name")
 {% endhighlight %}
+
 This acts like the Scala API's `flatMapWithState`.
 
 ### Managed Keyed State
@@ -110,7 +138,7 @@ This acts like the Scala API's `flatMapWithState`.
 Managed keyed state of an operator
 (see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state))
 can be made queryable by making the appropriate state descriptor queryable via
-`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below:
+`StateDescriptor.setQueryable(String queryableStateName)`, as in the example below:
 {% highlight java %}
 ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
         new ValueStateDescriptor<>(
@@ -119,61 +147,79 @@ ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                 Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
 descriptor.setQueryable("query-name"); // queryable state name
 {% endhighlight %}
+
 <div class="alert alert-info">
-  <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only
+  <strong>Note:</strong> The <code>queryableStateName</code> parameter may be chosen arbitrarily and is only
   used for queries. It does not have to be identical to the state's own name.
 </div>
 
+This variant has no limitations as to which type of state can be made queryable. This means that this can be used for 
+any `ValueState`, `ReduceState`, `ListState`, `MapState`, `AggregatingState`, and the currently deprecated `FoldingState`.
 
 ## Querying State
 
-The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that
-serve the state internally. It needs to be set up with a valid `JobManager` address and port and is
-created as follows:
+So far, you have set up your cluster to run with queryable state and you have declared (some of) your state as
+queryable. Now it is time to see how to query this state. 
 
-{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
-config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client` 
+jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below:
 
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-queryable-state-client-java_{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+
+For more on this, you can check how to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html).
+
+The `QueryableStateClient` will submit your query to the internal proxy, which will then process your query and return 
+the final result. The only requirement to initialize the client is to provide a valid `TaskManager` hostname (remember 
+that there is a queryable state proxy running on each task manager) and the port where the proxy listens. More on how 
+to configure the proxy and state server port(s) in the [Configuration Section](#Configuration).
 
-QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
+{% highlight java %}
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
 {% endhighlight %}
 
-The query method is this:
+With the client ready, to query a state of type `V`, associated with a key of type `K`, you can use the method:
 
 {% highlight java %}
-Future<byte[]> getKvState(
-    JobID jobID,
-    String queryableStateName,
-    int keyHashCode,
-    byte[] serializedKeyAndNamespace)
+CompletableFuture<S> getKvState(
+    final JobID jobId,
+    final String queryableStateName,
+    final K key,
+    final TypeInformation<K> keyTypeInfo,
+    final StateDescriptor<S, V> stateDescriptor)
 {% endhighlight %}
 
-A call to this method returns a `Future` eventually holding the serialized state value for the
-queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The
-`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the
-`serializedKeyAndNamespace` is the serialized key and namespace.
+The above returns a `CompletableFuture` eventually holding the state value for the queryable state instance identified 
+by `queryableStateName` of the job with ID `jobID`. The `key` is the key whose state you are interested in and the 
+`keyTypeInfo` will tell Flink how to serialize/deserialize it. Finally, the `stateDescriptor` contains the necessary 
+information about the requested state, namely its type (`Value`, `Reduce`, etc) and the necessary information on how 
+to serialize/deserialize it.
+
+The careful reader will notice that the returned future contains a value of type `S`, *i.e.* a `State` object containing
+the actual value. This can be any of the state types supported by Flink: `ValueState`, `ReduceState`, `ListState`, `MapState`,
+`AggregatingState`, and the currently deprecated `FoldingState`. 
+
+<div class="alert alert-info">
+  <strong>Note:</strong>These state objects do not allow modifications to the contained state. You can use them to get 
+  the actual value of the state, <i>e.g.</i> using <code>valueState.get()</code>, or iterate over
+  the contained <code><K, V></code> entries, <i>e.g.</i> using the <code>mapState.entries()</code>, but you cannot 
+  modify them. As an example, calling the <code>add()</code> method on a returned list state will throw an 
+  <code>UnsupportedOperationException</code>.
+</div>
+
 <div class="alert alert-info">
   <strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs
-  to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free
+  to be shutdown via <code>QueryableStateClient.shutdown()</code> when unused in order to free
   resources.
 </div>
 
-The current implementation is still pretty low-level in the sense that it only works with
-serialized data both for providing the key/namespace and the returned results. It is the
-responsibility of the user (or some follow-up utilities) to set up the serializers for this. The
-nice thing about this is that the query services don't have to get into the business of worrying
-about any class loading issues etc.
-
-There are some serialization utils for key/namespace and value serialization included in
-`KvStateRequestSerializer`.
-
 ### Example
 
 The following example extends the `CountWindowAverage` example
@@ -183,7 +229,7 @@ by making it queryable and showing how to query this value:
 {% highlight java %}
 public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
 
-    private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
+    private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
 
     @Override
     public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -214,74 +260,51 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
 Once used in a job, you can retrieve the job ID and then query any key's current state from this operator:
 
 {% highlight java %}
-final Configuration config = new Configuration();
-config.setString(JobManagerOptions.ADDRESS, queryAddress);
-config.setInteger(JobManagerOptions.PORT, queryPort);
-
-final HighAvailabilityServices highAvailabilityServices =
-      HighAvailabilityServicesUtils.createHighAvailabilityServices(
-           config,
-           Executors.newSingleThreadScheduledExecutor(),
-           HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
-
-final TypeSerializer<Long> keySerializer =
-        TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
-final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
-        TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
-
-final byte[] serializedKey =
-        KvStateRequestSerializer.serializeKeyAndNamespace(
-                key, keySerializer,
-                VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-
-Future<byte[]> serializedResult =
-        client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
-
-// now wait for the result and return it
-final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
-byte[] serializedValue = Await.result(serializedResult, duration);
-Tuple2<Long, Long> value =
-        KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
-{% endhighlight %}
-
-### Note for Scala Users
-
-Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import:
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
 
-```scala
-import org.apache.flink.streaming.api.scala._
-```
-
-Now you can create the type serializers as follows:
-
-```scala
-val keySerializer = createTypeInformation[Long]
-  .createSerializer(new ExecutionConfig)
-```
-
-If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be captured at runtime.
+// the state descriptor of the state to be fetched.
+ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+        new ValueStateDescriptor<>(
+          "average",
+          TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
+          Tuple2.of(0L, 0L));
+
+CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
+        client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
+
+// now handle the returned value
+resultFuture.thenAccept(response -> {
+        try {
+            Tuple2<Long, Long> res = response.get();
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+});
+{% endhighlight %}
 
 ## Configuration
 
 The following configuration parameters influence the behaviour of the queryable state server and client.
 They are defined in `QueryableStateOptions`.
 
-### Server
-* `query.server.enable`: flag to indicate whether to start the queryable state server
-* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port)
-* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots)
-* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots).
+### State Server
+* `query.server.ports`: the server port range of the queryable state server. This is useful to avoid port clashes if more 
+   than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200",
+   or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9067.
+* `query.server.network-threads`: number of network (event loop) threads receiving incoming requests for the state server (0 => #slots)
+* `query.server.query-threads`: number of threads handling/serving incoming requests for the state server (0 => #slots).
+
 
-### Client (`QueryableStateClient`)
-* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores)
-* `query.client.lookup.num-retries`: number of retries on location lookup failures
-* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis)
+### Proxy
+* `query.proxy.ports`: the server port range of the queryable state proxy. This is useful to avoid port clashes if more 
+  than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200",
+  or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9069.
+* `query.proxy.network-threads`: number of network (event loop) threads receiving incoming requests for the client proxy (0 => #slots)
+* `query.proxy.query-threads`: number of threads handling/serving incoming requests for the client proxy (0 => #slots).
 
 ## Limitations
 
-* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register
+* The queryable state life-cycle is bound to the life-cycle of the job, *e.g.* tasks register
 queryable state on startup and unregister it on disposal. In future versions, it is desirable to
 decouple this in order to allow queries after a task finishes, and to speed up recovery via state
 replication.