You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/12 14:26:47 UTC

[GitHub] [flink] qinjunjerry commented on a change in pull request #10502: [FLINK-14825][state-processor-api][docs] Rework state processor api documentation

qinjunjerry commented on a change in pull request #10502: [FLINK-14825][state-processor-api][docs] Rework state processor api documentation
URL: https://github.com/apache/flink/pull/10502#discussion_r357173100
 
 

 ##########
 File path: docs/dev/libs/state_processor_api.md
 ##########
 @@ -23,166 +23,99 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api.
-This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications.
+Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet API.
+Due to the [interoperability of DataSet and Table API](https://ci.apache.org/projects/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api), you can even use relational Table API or SQL queries to analyze and process state data.
+
+For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly.
+Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application.
+It's also possible to fix inconsistent state entries.
+Finally, the State Processor API opens up many ways to evolve a stateful application that were previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started.
+For example, you can now arbitrarily modify the data types of states, adjust the maximum parallelism of operators, split or merge operator state, re-assign operator UIDs, and so on.
+
+To get started with the state processor api, include the following library in your application.
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-state-processor-api{{ site.scala_version_suffix }}</artifactId>
+  <version>{{site.version}}</version>
+  <scope>provided</scope>
+</dependency>
+{% endhighlight %}
 
 * This will be replaced by the TOC
 {:toc}
 
-## Abstraction
-
-To understand how to best interact with savepoints in a batch context it is important to have a clear mental model of how the data in Flink state relates to a traditional relational database.
+## Mapping Application State to DataSets 
 
-A database can be thought of as one or more namespaces, each containing a collection of tables.
-Those tables in turn contain columns whose values have some intrinsic relationship between them, such as being scoped under the same key.
+The State Processor API maps the state of a streaming application to one or more data sets that can be separately processed.
+In order to be able to use the API, you need to understand how this mapping works.
 
-A savepoint represents the state of a Flink job at a particular point in time which is made up of many operators.
-Those operators contain various kinds of state, both partitioned or keyed state, and non-partitioned or operator state. 
+But let's first have a look at what a stateful Flink job looks like.
+A Flink job is composed of operators, typically one or more source operators, a few operators for the actual processing, and one or more sink operators.
+Each operator runs in parallel in one or more tasks and can work with different types of state.
+An operator can have zero, one, or more *“operator states”* which are organized as lists that are scoped to the operator's tasks.
+If the operator is applied on a keyed stream, it can also have zero, one, or more *“keyed states”* which are scoped to a key that is extracted from each processed record.
+You can think of keyed state as a distributed key-value map. 
 
-<div data-lang="java" markdown="1">
-{% highlight java %}
-MapStateDescriptor<Integer, Double> CURRENCY_RATES = new MapStateDescriptor<>("rates", Types.INT, Types.DOUBLE);
- 
-class CurrencyConverter extends BroadcastProcessFunction<Transaction, CurrencyRate, Transaction> {
- 
-  public void processElement(
-        Transaction value,
-        ReadOnlyContext ctx,
-        Collector<Transaction> out) throws Exception {
- 
-     Double rate = ctx.getBroadcastState(CURRENCY_RATES).get(value.currencyId);
-     if (rate != null) {
-        value.amount *= rate;
-     }
-     out.collect(value);
-  }
- 
-  public void processBroadcastElement(
-        CurrencyRate value,
-        Context ctx,
-        Collector<Transaction> out) throws Exception {
-        ctx.getBroadcastState(CURRENCY_RATES).put(value.currencyId, value.rate);
-  }
-}
-  
-class Summarize extends RichFlatMapFunction<Transaction, Summary> {
-  transient ValueState<Double> totalState;
-  transient ValueState<Integer> countState;
- 
-  public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
-  }
- 
-  public void flatMap(Transaction value, Collector<Summary> out) throws Exception {
-     Summary summary = new Summary();
-     summary.total = value.amount;
-     summary.count = 1;
- 
-     Double currentTotal = totalState.value();
-     if (currentTotal != null) {
-        summary.total += currentTotal;
-     }
- 
-     Integer currentCount = countState.value();
-     if (currentCount != null) {
-        summary.count += currentCount;
-     }
-     countState.update(summary.count);
- 
-     out.collect(summary);
-  }
-}
- 
-DataStream<Transaction> transactions = . . .
-BroadcastStream<CurrencyRate> rates = . . .
-transactions
-  .connect(rates)
-  .process(new CurrencyConverter())
-  .uid("currency_converter")
-  .keyBy(transaction -> transaction.accountId)
-  .flatMap(new Summarize())
-  .uid("summarize")
-{% endhighlight %}
-</div>
+The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”.
+Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless.
 
-This job contains multiple operators along with various kinds of state.
-When analyzing that state we can first scope data by its operator, named by setting its uid.
-Within each operator we can look at the registered states.
-`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state.
-In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row.
-Contrast this with Summarize, which contains two keyed states.
-Because both states are scoped under the same key we can safely assume there exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state.
-All of this means that the state for this job could be described using the following pseudo-sql commands. 
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px">
+	<img src="{{ site.baseurl }}/fig/application-my-app-state-processor-api.png" width="600px" alt="Application: My App"/>
+</p>
 
-{% highlight sql %}
-CREATE NAMESPACE currency_converter;
- 
-CREATE TABLE currency_converter.rates (
-   value Tuple2<Integer, Double>
-);
- 
-CREATE NAMESPACE summarize;
- 
-CREATE TABLE summarize.keyed_state (
-   key   INTEGER PRIMARY KEY,
-   total DOUBLE,
-   count INTEGER
-);
-{% endhighlight %}
+A savepoint or checkpoint of MyApp consists of the data of all states, organized in a way that the states of each task can be restored.
+When processing the data of a savepoint (or checkpoint) with a batch job, we need a mental model that maps the data of the individual tasks' states into data sets or tables.
+In fact, we can think of a savepoint as a database. Every operator (identified by its UID) represents a namespace.
+Each operator state of an operator is mapped to a dedicated table in the namespace with a single column that holds the state's data of all tasks.
+All keyed states of an operator are mapped to a single table consisting of a column for the key, and one column for each keyed state.
+The following figure shows how a savepoint of MyApp is mapped to a database.
 
-In general, the savepoint ↔ database relationship can be summarized as:
+<p style="display: block; text-align: center; margin-top: 20px; margin-bottom: 20px">
+	<img src="{{ site.baseurl }}/fig/database-my-app-state-processor-api.png" width="600px" alt="Database: My App"/>
+</p>
 
-    * A savepoint is a database
-    * An operator is a namespace named by its uid
-    * Each operator state represents a single table
-        * Each element in an operator state represents a single row in that table
-    * Each operator containing keyed state has a single “keyed_state” table
-        * Each keyed_state table has one key column mapping the key value of the operator
-        * Each registered state represents a single column in the table
-        * Each row in the table maps to a single key
+The figure shows how the values of Src's operator state are mapped to a table with one column and five rows, one row for all list entries across all parallel tasks of Src.
+Operator state os2 of the operator “Proc” is similarly mapped to an individual table.
+The keyed states ks1 and ks2 are combined to a single table with three columns, one for the key, one for ks1 and one for ks2.
+The keyed table holds one row for each distinct key of both keyed states.
+Since the operator “Snk” does not have any state, its namespace is empty.
 
 ## Reading State
 
-Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data.
-The compatability guarantees for restoring state are identical to those when restoring a `DataStream` application.
+Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data.
+The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment bEnv   = ExecutionEnvironment.getExecutionEnvironment();
-ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend());
+ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend());
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val bEnv      = ExecutionEnvironment.getExecutionEnvironment()
-val savepoint = Savepoint.load(bEnv, "hdfs://path/", new RocksDBStateBackend())
+val savepoint = Savepoint.load(bEnv, "hdfs://path/", new MemoryStateBackend())
 {% endhighlight %}
 </div>
 </div>
 
-When reading operator state, simply specify the operator uid, state name, and type information.
+### Operator State
 
 Review comment:
   I think adding the following generalized sentence here before expanding to different kind of operator states is helpful, especially for understanding those parameters given in the followed examples:
   
   >When reading operator state, users specify the operator uid, the state name, and the type information.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services