You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/11/07 13:14:09 UTC
[2/2] flink git commit: [FLINK-7822][QS][doc] Update Queryable State
docs.
[FLINK-7822][QS][doc] Update Queryable State docs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81999545
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81999545
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81999545
Branch: refs/heads/release-1.4
Commit: 819995454611be6a85e2933318d053b2c25a18f7
Parents: aab36f9
Author: kkloudas <kk...@gmail.com>
Authored: Mon Nov 6 17:21:45 2017 +0100
Committer: kkloudas <kk...@gmail.com>
Committed: Tue Nov 7 14:12:59 2017 +0100
----------------------------------------------------------------------
docs/dev/stream/state/queryable_state.md | 265 ++++++++++++++------------
1 file changed, 144 insertions(+), 121 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/81999545/docs/dev/stream/state/queryable_state.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/state/queryable_state.md b/docs/dev/stream/state/queryable_state.md
index 8012f67..af646df 100644
--- a/docs/dev/stream/state/queryable_state.md
+++ b/docs/dev/stream/state/queryable_state.md
@@ -32,38 +32,68 @@ under the License.
likely that there will be breaking API changes on the client side in the upcoming Flink versions.
</div>
-In a nutshell, this feature allows users to query Flink's managed partitioned state
-(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) from outside of
-Flink. For some scenarios, queryable state thus eliminates the need for distributed
-operations/transactions with external systems such as key-value stores which are often the
-bottleneck in practice.
+In a nutshell, this feature exposes Flink's managed keyed (partitioned) state
+(see [Working with State]({{ site.baseurl }}/dev/stream/state/state.html)) to the outside world and
+allows the user to query a job's state from outside Flink. For some scenarios, queryable state
+eliminates the need for distributed operations/transactions with external systems such as key-value
+stores which are often the bottleneck in practice. In addition, this feature may be particularly
+useful for debugging purposes.
<div class="alert alert-warning">
- <strong>Attention:</strong> Queryable state accesses keyed state from a concurrent thread rather
- than synchronizing with the operator and potentially blocking its operation. Since any state
- backend using Java heap space, e.g. MemoryStateBackend or
- FsStateBackend, does not work with copies when retrieving values but instead directly
- references the stored values, read-modify-write patterns are unsafe and may cause the
- queryable state server to fail due to concurrent modifications.
- The RocksDBStateBackend is safe from these issues.
+ <strong>Attention:</strong> When querying a state object, that object is accessed from a concurrent
+ thread without any synchronization or copying. This is a design choice, as any of the above would lead
+ to increased job latency, which we wanted to avoid. Since any state backend using Java heap space,
+ <i>e.g.</i> <code>MemoryStateBackend</code> or <code>FsStateBackend</code>, does not work
+ with copies when retrieving values but instead directly references the stored values, read-modify-write
+ patterns are unsafe and may cause the queryable state server to fail due to concurrent modifications.
+ The <code>RocksDBStateBackend</code> is safe from these issues.
</div>
+## Architecture
+
+Before showing how to use the Queryable State, it is useful to briefly describe the entities that compose it.
+The Queryable State feature consists of three main entities:
+
+ 1. the `QueryableStateClient`, which (potentially) runs outside the Flink cluster and submits the user queries,
+ 2. the `QueryableStateClientProxy`, which runs on each `TaskManager` (*i.e.* inside the Flink cluster) and is responsible
+ for receiving the client's queries, fetching the requested state from the responsible Task Manager on his behalf, and
+ returning it to the client, and
+ 3. the `QueryableStateServer` which runs on each `TaskManager` and is responsible for serving the locally stored state.
+
+In a nutshell, the client will connect to one of the proxies and send a request for the state associated with a specific
+key, `k`. As stated in [Working with State]({{ site.baseurl }}/dev/stream/state/state.html), keyed state is organized in
+*Key Groups*, and each `TaskManager` is assigned a number of these key groups. To discover which `TaskManager` is
+responsible for the key group holding `k`, the proxy will ask the `JobManager`. Based on the answer, the proxy will
+then query the `QueryableStateServer` running on that `TaskManager` for the state associated with `k`, and forward the
+response back to the client.
+
+## Activating Queryable State
+
+To enable queryable state on your Flink cluster, you just have to copy the
+`flink-queryable-state-runtime{{ site.scala_version_suffix }}-{{site.version }}.jar`
+from the `opt/` folder of your [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads"),
+to the `lib/` folder. Otherwise, the queryable state feature is not enabled.
+
+To verify that your cluster is running with queryable state enabled, check the logs of any
+task manager for the line: `"Started the Queryable State Proxy Server @ ..."`.
+
## Making State Queryable
-In order to make state queryable, the queryable state server first needs to be enabled globally
-by setting the `query.server.enable` configuration parameter to `true` (current default).
-Then appropriate state needs to be made queryable by using either
+Now that you have activated queryable state on your cluster, it is time to see how to use it. In order for a state to
+be visible to the outside world, it needs to be explicitly made queryable by using:
-* a `QueryableStateStream`, a convenience object which behaves like a sink and offers incoming values as
-queryable state, or
-* `StateDescriptor#setQueryable(String queryableStateName)`, which makes the keyed state of an
-operator queryable.
+* either a `QueryableStateStream`, a convenience object which acts as a sink and offers its incoming values as queryable
+state, or
+* the `stateDescriptor.setQueryable(String queryableStateName)` method, which makes the keyed state represented by the
+ state descriptor, queryable.
The following sections explain the use of these two approaches.
### Queryable State Stream
-A `KeyedStream` may offer its values as queryable state by using the following methods:
+Calling `.asQueryableState(stateName, stateDescriptor)` on a `KeyedStream` returns a `QueryableStateStream` which offers
+its values as queryable state. Depending on the type of state, there are the following variants of the `asQueryableState()`
+method:
{% highlight java %}
// ValueState
@@ -91,18 +121,16 @@ QueryableStateStream asQueryableState(
list which may not be cleaned up and thus will eventually consume too much memory.
</div>
-A call to these methods returns a `QueryableStateStream`, which cannot be further transformed and
-currently only holds the name as well as the value and key serializer for the queryable state
-stream. It is comparable to a sink, and cannot be followed by further transformations.
+The returned `QueryableStateStream` can be seen as a sink and **cannot** be further transformed. Internally, a
+`QueryableStateStream` gets translated to an operator which uses all incoming records to update the queryable state
+instance. The updating logic is implied by the type of the `StateDescriptor` provided in the `asQueryableState` call.
+In a program like the following, all records of the keyed stream will be used to update the state instance via the
+`ValueState.update(value)`:
-Internally a `QueryableStateStream` gets translated to an operator which uses all incoming
-records to update the queryable state instance.
-In a program like the following, all records of the keyed stream will be used to update the state
-instance, either via `ValueState#update(value)` or `AppendingState#add(value)`, depending on
-the chosen state variant:
{% highlight java %}
stream.keyBy(0).asQueryableState("query-name")
{% endhighlight %}
+
This acts like the Scala API's `flatMapWithState`.
### Managed Keyed State
@@ -110,7 +138,7 @@ This acts like the Scala API's `flatMapWithState`.
Managed keyed state of an operator
(see [Using Managed Keyed State]({{ site.baseurl }}/dev/stream/state/state.html#using-managed-keyed-state))
can be made queryable by making the appropriate state descriptor queryable via
-`StateDescriptor#setQueryable(String queryableStateName)`, as in the example below:
+`StateDescriptor.setQueryable(String queryableStateName)`, as in the example below:
{% highlight java %}
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
@@ -119,61 +147,79 @@ ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
descriptor.setQueryable("query-name"); // queryable state name
{% endhighlight %}
+
<div class="alert alert-info">
- <strong>Note:</strong> The `queryableStateName` parameter may be chosen arbitrarily and is only
+ <strong>Note:</strong> The <code>queryableStateName</code> parameter may be chosen arbitrarily and is only
used for queries. It does not have to be identical to the state's own name.
</div>
+This variant has no limitations as to which type of state can be made queryable. This means that this can be used for
+any `ValueState`, `ReduceState`, `ListState`, `MapState`, `AggregatingState`, and the currently deprecated `FoldingState`.
## Querying State
-The `QueryableStateClient` helper class may be used for queries against the `KvState` instances that
-serve the state internally. It needs to be set up with a valid `JobManager` address and port and is
-created as follows:
+So far, you have set up your cluster to run with queryable state and you have declared (some of) your state as
+queryable. Now it is time to see how to query this state.
-{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, queryAddress);
-config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, queryPort);
+For this you can use the `QueryableStateClient` helper class. This is available in the `flink-queryable-state-client`
+jar which you have to explicitly include as a dependency in the `pom.xml` of your project, as shown below:
-final HighAvailabilityServices highAvailabilityServices =
- HighAvailabilityServicesUtils.createHighAvailabilityServices(
- config,
- Executors.newSingleThreadScheduledExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
+<div data-lang="java" markdown="1">
+{% highlight xml %}
+<dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-queryable-state-client-java_{{ site.scala_version_suffix }}</artifactId>
+ <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+</div>
+
+For more on this, you can check how to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html).
+
+The `QueryableStateClient` will submit your query to the internal proxy, which will then process your query and return
+the final result. The only requirement to initialize the client is to provide a valid `TaskManager` hostname (remember
+that there is a queryable state proxy running on each task manager) and the port where the proxy listens. More on how
+to configure the proxy and state server port(s) in the [Configuration Section](#Configuration).
-QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
+{% highlight java %}
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
{% endhighlight %}
-The query method is this:
+With the client ready, to query a state of type `V`, associated with a key of type `K`, you can use the method:
{% highlight java %}
-Future<byte[]> getKvState(
- JobID jobID,
- String queryableStateName,
- int keyHashCode,
- byte[] serializedKeyAndNamespace)
+CompletableFuture<S> getKvState(
+ final JobID jobId,
+ final String queryableStateName,
+ final K key,
+ final TypeInformation<K> keyTypeInfo,
+ final StateDescriptor<S, V> stateDescriptor)
{% endhighlight %}
-A call to this method returns a `Future` eventually holding the serialized state value for the
-queryable state instance identified by `queryableStateName` of the job with ID `jobID`. The
-`keyHashCode` is the hash code of the key as returned by `Object.hashCode()` and the
-`serializedKeyAndNamespace` is the serialized key and namespace.
+The above returns a `CompletableFuture` eventually holding the state value for the queryable state instance identified
+by `queryableStateName` of the job with ID `jobID`. The `key` is the key whose state you are interested in and the
+`keyTypeInfo` will tell Flink how to serialize/deserialize it. Finally, the `stateDescriptor` contains the necessary
+information about the requested state, namely its type (`Value`, `Reduce`, etc) and the necessary information on how
+to serialize/deserialize it.
+
+The careful reader will notice that the returned future contains a value of type `S`, *i.e.* a `State` object containing
+the actual value. This can be any of the state types supported by Flink: `ValueState`, `ReduceState`, `ListState`, `MapState`,
+`AggregatingState`, and the currently deprecated `FoldingState`.
+
+<div class="alert alert-info">
+ <strong>Note:</strong>These state objects do not allow modifications to the contained state. You can use them to get
+ the actual value of the state, <i>e.g.</i> using <code>valueState.get()</code>, or iterate over
+ the contained <code><K, V></code> entries, <i>e.g.</i> using the <code>mapState.entries()</code>, but you cannot
+ modify them. As an example, calling the <code>add()</code> method on a returned list state will throw an
+ <code>UnsupportedOperationException</code>.
+</div>
+
<div class="alert alert-info">
<strong>Note:</strong> The client is asynchronous and can be shared by multiple threads. It needs
- to be shutdown via <code>QueryableStateClient#shutdown()</code> when unused in order to free
+ to be shutdown via <code>QueryableStateClient.shutdown()</code> when unused in order to free
resources.
</div>
-The current implementation is still pretty low-level in the sense that it only works with
-serialized data both for providing the key/namespace and the returned results. It is the
-responsibility of the user (or some follow-up utilities) to set up the serializers for this. The
-nice thing about this is that the query services don't have to get into the business of worrying
-about any class loading issues etc.
-
-There are some serialization utils for key/namespace and value serialization included in
-`KvStateRequestSerializer`.
-
### Example
The following example extends the `CountWindowAverage` example
@@ -183,7 +229,7 @@ by making it queryable and showing how to query this value:
{% highlight java %}
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
- private transient ValueState<Tuple2<Long /* count */, Long /* sum */>> sum;
+ private transient ValueState<Tuple2<Long, Long>> sum; // a tuple containing the count and the sum
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
@@ -214,74 +260,51 @@ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>,
Once used in a job, you can retrieve the job ID and then query any key's current state from this operator:
{% highlight java %}
-final Configuration config = new Configuration();
-config.setString(JobManagerOptions.ADDRESS, queryAddress);
-config.setInteger(JobManagerOptions.PORT, queryPort);
-
-final HighAvailabilityServices highAvailabilityServices =
- HighAvailabilityServicesUtils.createHighAvailabilityServices(
- config,
- Executors.newSingleThreadScheduledExecutor(),
- HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-QueryableStateClient client = new QueryableStateClient(config, highAvailabilityServices);
-
-final TypeSerializer<Long> keySerializer =
- TypeInformation.of(new TypeHint<Long>() {}).createSerializer(new ExecutionConfig());
-final TypeSerializer<Tuple2<Long, Long>> valueSerializer =
- TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}).createSerializer(new ExecutionConfig());
-
-final byte[] serializedKey =
- KvStateRequestSerializer.serializeKeyAndNamespace(
- key, keySerializer,
- VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE);
-
-Future<byte[]> serializedResult =
- client.getKvState(jobId, "query-name", key.hashCode(), serializedKey);
-
-// now wait for the result and return it
-final FiniteDuration duration = new FiniteDuration(1, TimeUnit.SECONDS);
-byte[] serializedValue = Await.result(serializedResult, duration);
-Tuple2<Long, Long> value =
- KvStateRequestSerializer.deserializeValue(serializedValue, valueSerializer);
-{% endhighlight %}
-
-### Note for Scala Users
-
-Please use the available Scala extensions when creating the `TypeSerializer` instances. Add the following import:
+QueryableStateClient client = new QueryableStateClient(tmHostname, proxyPort);
-```scala
-import org.apache.flink.streaming.api.scala._
-```
-
-Now you can create the type serializers as follows:
-
-```scala
-val keySerializer = createTypeInformation[Long]
- .createSerializer(new ExecutionConfig)
-```
-
-If you don't do this, you can run into mismatches between the serializers used in the Flink job and in your client code, because types like `scala.Long` cannot be captured at runtime.
+// the state descriptor of the state to be fetched.
+ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
+ new ValueStateDescriptor<>(
+ "average",
+ TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}),
+ Tuple2.of(0L, 0L));
+
+CompletableFuture<ValueState<Tuple2<Long, Long>>> resultFuture =
+ client.getKvState(jobId, "query-name", key, BasicTypeInfo.LONG_TYPE_INFO, descriptor);
+
+// now handle the returned value
+resultFuture.thenAccept(response -> {
+ try {
+ Tuple2<Long, Long> res = response.get();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+});
+{% endhighlight %}
## Configuration
The following configuration parameters influence the behaviour of the queryable state server and client.
They are defined in `QueryableStateOptions`.
-### Server
-* `query.server.enable`: flag to indicate whether to start the queryable state server
-* `query.server.port`: port to bind to the internal `KvStateServer` (0 => pick random available port)
-* `query.server.network-threads`: number of network (event loop) threads for the `KvStateServer` (0 => #slots)
-* `query.server.query-threads`: number of asynchronous query threads for the `KvStateServerHandler` (0 => #slots).
+### State Server
+* `query.server.ports`: the server port range of the queryable state server. This is useful to avoid port clashes if more
+ than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200",
+ or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9067.
+* `query.server.network-threads`: number of network (event loop) threads receiving incoming requests for the state server (0 => #slots)
+* `query.server.query-threads`: number of threads handling/serving incoming requests for the state server (0 => #slots).
+
-### Client (`QueryableStateClient`)
-* `query.client.network-threads`: number of network (event loop) threads for the `KvStateClient` (0 => number of available cores)
-* `query.client.lookup.num-retries`: number of retries on location lookup failures
-* `query.client.lookup.retry-delay`: retry delay on location lookup failures (millis)
+### Proxy
+* `query.proxy.ports`: the server port range of the queryable state proxy. This is useful to avoid port clashes if more
+ than 1 task managers run on the same machine. The specified range can be: a port: "9123", a range of ports: "50100-50200",
+ or a list of ranges and or points: "50100-50200,50300-50400,51234". The default port is 9069.
+* `query.proxy.network-threads`: number of network (event loop) threads receiving incoming requests for the client proxy (0 => #slots)
+* `query.proxy.query-threads`: number of threads handling/serving incoming requests for the client proxy (0 => #slots).
## Limitations
-* The queryable state life-cycle is bound to the life-cycle of the job, e.g. tasks register
+* The queryable state life-cycle is bound to the life-cycle of the job, *e.g.* tasks register
queryable state on startup and unregister it on disposal. In future versions, it is desirable to
decouple this in order to allow queries after a task finishes, and to speed up recovery via state
replication.