You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2019/07/04 07:10:34 UTC

[flink] 03/11: [FLINK-12963] [docs] Document savepoint writer for bootstrapping new savepoints

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03bb097acf3594f1b445bd8a59acbbae2d34ad15
Author: Seth Wiesman <sj...@gmail.com>
AuthorDate: Mon Jun 24 16:11:46 2019 -0500

    [FLINK-12963] [docs] Document savepoint writer for bootstrapping new savepoints
---
 docs/dev/libs/state_processor_api.md               | 151 +++++++++++++++++++--
 ..._processor_api.md => state_processor_api.zh.md} | 151 +++++++++++++++++++--
 2 files changed, 280 insertions(+), 22 deletions(-)

diff --git a/docs/dev/libs/state_processor_api.md b/docs/dev/libs/state_processor_api.md
index ae8d059..676ac49 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.md
@@ -23,8 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink State Processor API provides powerful functionality to reading savepoints and checkpoints using Flink’s batch DataSet api. 
-This is useful for analyzing state for interesting patterns and troubleshooting or auditing jobs by checking for discrepancies.
+Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api.
+This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications.
 
 * This will be replaced by the TOC
 {:toc}
@@ -70,8 +70,8 @@ class Summarize extends RichFlatMapFunction<Transaction, Summary> {
   transient ValueState<Integer> countState;
  
   public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Types.INT));
+     totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE));
+     countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
   }
  
   public void flatMap(Transaction value, Collector<Summary> out) throws Exception {
@@ -108,10 +108,12 @@ transactions
 
 This job contains multiple operators along with various kinds of state.
 When analyzing that state we can first scope data by its operator, named by setting its uid.
-Within each operator we can look at the registered states. CurrencyConverter has a broadcast state, which is a type of non-partitioned operator state.
-In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. Contrast this with Summarize, which contains two keyed states.
+Within each operator we can look at the registered states.
+`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state.
+In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row.
+Contrast this with Summarize, which contains two keyed states.
 Because both states are scoped under the same key we can safely assume there exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator containing one “key” column along with n value columns, one for each registered state.
+Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state.
 All of this means that the state for this job could be described using the following pseudo-sql commands. 
 
 {% highlight sql %}
@@ -144,7 +146,7 @@ In general, the savepoint ↔ database relationship can be summarized as:
 ## Reading State
 
 Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data.
-The compatability guaruntees for restoring state are identical to those when restoring a `DataStream` application.
+The compatability guarantees for restoring state are identical to those when restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -253,7 +255,7 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte
 {% highlight scala %}
 public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] {
  
-  @transient var state: ValueState[Integer];
+  var state: ValueState[Integer];
  
    @throws[Exception]
    override def open(Configuration parameters) {
@@ -308,7 +310,7 @@ DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFu
 case class KeyedState(key: Int, value: Int)
  
 class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
-  @transient var state ValueState[Integer];
+  var state ValueState[Integer];
  
   override def open(Configuration parameters) {
      state = getRuntimeContext().getState(stateDescriptor);
@@ -327,4 +329,131 @@ class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
 val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
 {% endhighlight %}
 </div>
-</div>
\ No newline at end of file
+</div>
+
+{% panel **Note:** When using a `KeyedStateReaderFunction` all state descriptors must be registered eagerly inside of open. Any attempt to call `RuntimeContext#getState`, `RuntimeContext#getListState`, or `RuntimeContext#getMapState` will result in a `RuntimeException`. %}
+
+## Writing New Savepoints
+
+State writers are based around the abstraction of `Savepoint`, where one `Savepoint` may have many operators and the state for any particular operator is created using a `BootstrapTransformation`.
+
+A `BootstrapTransformation` starts with a `DataSet` containing the values that are to be written into state.
+The transformation may be optionally `keyed` depending on whether or not you are writing keyed or operator state.
+Finally a bootstrap function is applied depending to the transformation; Flink supplies `KeyedStateBootstrapFunction` for writing keyed state, `StateBootstrapFunction` for writing non keyed state, and `BroadcastStateBootstrapFunction` for writing broadcast state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public  class Account {
+    public int id;
+
+    public double amount;	
+
+    public long timestamp;
+}
+ 
+public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
+    ValueState<Double> state;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
+        state = getRuntimeContext().getState(descriptor);
+    }
+
+    @Override
+    public void processElement(Account value, Context ctx) throws Exception {
+        state.update(value.amount);
+    }
+}
+ 
+ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
+
+BootstrapTransformation<Account> transformation = OperatorTransformation
+    .bootstrapWith(accountDataSet)
+    .keyBy(acc -> acc.id)
+    .transform(new AccountBootstrapper());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+case class Account(id: Int, amount: Double, timestamp: Long)
+ 
+class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] {
+    var state: ValueState[Double]
+
+    override def open(parameters: Configuration): Unit = {
+        val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE)
+        state = getRuntimeContext().getState(descriptor)
+    }
+
+    @throws[Exception]
+    override def processElement(value: Account, ctx: Context): Unit = {
+        state.update(value.amount)
+    }
+}
+ 
+val bEnv = ExecutionEnvironment.getExecutionEnvironment()
+
+val accountDataSet = bEnv.fromCollection(accounts)
+
+val transformation = OperatorTransformation
+    .bootstrapWith(accountDataSet)
+    .keyBy(acc -> acc.id)
+    .transform(new AccountBootstrapper())
+{% endhighlight %}
+</div>
+</div>
+
+The 'KeyedStateBootstrapFunction` supports setting event time and processing time timers.
+The timers will not fire insde the bootstrap function and only become active once restored within a `DataStream` application.
+If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediatly upon start.
+
+Once one or more transformations have been created they may be combined into a single `Savepoint`. 
+`Savepoint`'s are created using a state backend and max parallelism, they may contain any number of operators. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Savepoint
+    .create(backend, 128)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+Savepoint
+    .create(backend, 128)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath)
+{% endhighlight %}
+</div>
+</div>
+		
+Besides creating a savepoint from scratch, you can base on off an existing savepoint such as when bootstrapping a single new operator for an existing job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Savepoint
+    .load(backend, oldPath)
+    .withOperator("uid", transformation)
+    .write(newPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+Savepoint
+    .load(backend, oldPath)
+    .withOperator("uid", transformation)
+    .write(newPath)
+{% endhighlight %}
+</div>
+</div>
+
+{% panel **Note:** When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other! %}
diff --git a/docs/dev/libs/state_processor_api.md b/docs/dev/libs/state_processor_api.zh.md
similarity index 63%
copy from docs/dev/libs/state_processor_api.md
copy to docs/dev/libs/state_processor_api.zh.md
index ae8d059..676ac49 100644
--- a/docs/dev/libs/state_processor_api.md
+++ b/docs/dev/libs/state_processor_api.zh.md
@@ -23,8 +23,8 @@ specific language governing permissions and limitations
 under the License.
 -->
 
-Apache Flink State Processor API provides powerful functionality to reading savepoints and checkpoints using Flink’s batch DataSet api. 
-This is useful for analyzing state for interesting patterns and troubleshooting or auditing jobs by checking for discrepancies.
+Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifing savepoints and checkpoints using Flink’s batch DataSet api.
+This is useful for tasks such as analyzing state for interesting patterns, troubleshooting or auditing jobs by checking for discrepancies, and bootstrapping state for new applications.
 
 * This will be replaced by the TOC
 {:toc}
@@ -70,8 +70,8 @@ class Summarize extends RichFlatMapFunction<Transaction, Summary> {
   transient ValueState<Integer> countState;
  
   public void open(Configuration configuration) throws Exception {
-     totalState = getRuntimeContext().getState(new ValueStateDescriptor("total", Types.DOUBLE));
-     countState = getRuntimeContext().getState(new ValueStateDescriptor("count", Types.INT));
+     totalState = getRuntimeContext().getState(new ValueStateDescriptor<>("total", Types.DOUBLE));
+     countState = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Types.INT));
   }
  
   public void flatMap(Transaction value, Collector<Summary> out) throws Exception {
@@ -108,10 +108,12 @@ transactions
 
 This job contains multiple operators along with various kinds of state.
 When analyzing that state we can first scope data by its operator, named by setting its uid.
-Within each operator we can look at the registered states. CurrencyConverter has a broadcast state, which is a type of non-partitioned operator state.
-In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row. Contrast this with Summarize, which contains two keyed states.
+Within each operator we can look at the registered states.
+`CurrencyConverter` has a broadcast state, which is a type of non-partitioned operator state.
+In general, there is no relationship between any two elements in an operator state and so we can look at each value as being its own row.
+Contrast this with Summarize, which contains two keyed states.
 Because both states are scoped under the same key we can safely assume there exists some relationship between the two values.
-Therefore, keyed state is best understood as a single table per operator containing one “key” column along with n value columns, one for each registered state.
+Therefore, keyed state is best understood as a single table per operator containing one _key_ column along with _n_ value columns, one for each registered state.
 All of this means that the state for this job could be described using the following pseudo-sql commands. 
 
 {% highlight sql %}
@@ -144,7 +146,7 @@ In general, the savepoint ↔ database relationship can be summarized as:
 ## Reading State
 
 Reading state begins by specifiying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data.
-The compatability guaruntees for restoring state are identical to those when restoring a `DataStream` application.
+The compatability guarantees for restoring state are identical to those when restoring a `DataStream` application.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -253,7 +255,7 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte
 {% highlight scala %}
 public class StatefulFunctionWithTime extends KeyedProcessFunction[Integer, Integer, Void] {
  
-  @transient var state: ValueState[Integer];
+  var state: ValueState[Integer];
  
    @throws[Exception]
    override def open(Configuration parameters) {
@@ -308,7 +310,7 @@ DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFu
 case class KeyedState(key: Int, value: Int)
  
 class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
-  @transient var state ValueState[Integer];
+  var state ValueState[Integer];
  
   override def open(Configuration parameters) {
      state = getRuntimeContext().getState(stateDescriptor);
@@ -327,4 +329,131 @@ class ReaderFunction extends KeyedStateReaderFunction[Integer, KeyedState] {
 val keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
 {% endhighlight %}
 </div>
-</div>
\ No newline at end of file
+</div>
+
+{% panel **Note:** When using a `KeyedStateReaderFunction` all state descriptors must be registered eagerly inside of open. Any attempt to call `RuntimeContext#getState`, `RuntimeContext#getListState`, or `RuntimeContext#getMapState` will result in a `RuntimeException`. %}
+
+## Writing New Savepoints
+
+State writers are based around the abstraction of `Savepoint`, where one `Savepoint` may have many operators and the state for any particular operator is created using a `BootstrapTransformation`.
+
+A `BootstrapTransformation` starts with a `DataSet` containing the values that are to be written into state.
+The transformation may be optionally `keyed` depending on whether or not you are writing keyed or operator state.
+Finally a bootstrap function is applied depending to the transformation; Flink supplies `KeyedStateBootstrapFunction` for writing keyed state, `StateBootstrapFunction` for writing non keyed state, and `BroadcastStateBootstrapFunction` for writing broadcast state.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public  class Account {
+    public int id;
+
+    public double amount;	
+
+    public long timestamp;
+}
+ 
+public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Account> {
+    ValueState<Double> state;
+
+    @Override
+    public void open(Configuration parameters) {
+        ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("total",Types.DOUBLE);
+        state = getRuntimeContext().getState(descriptor);
+    }
+
+    @Override
+    public void processElement(Account value, Context ctx) throws Exception {
+        state.update(value.amount);
+    }
+}
+ 
+ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
+
+BootstrapTransformation<Account> transformation = OperatorTransformation
+    .bootstrapWith(accountDataSet)
+    .keyBy(acc -> acc.id)
+    .transform(new AccountBootstrapper());
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+case class Account(id: Int, amount: Double, timestamp: Long)
+ 
+class AccountBootstrapper extends KeyedStateBootstrapFunction[Integer, Account] {
+    var state: ValueState[Double]
+
+    override def open(parameters: Configuration): Unit = {
+        val descriptor = new ValueStateDescriptor[Double]("total",Types.DOUBLE)
+        state = getRuntimeContext().getState(descriptor)
+    }
+
+    @throws[Exception]
+    override def processElement(value: Account, ctx: Context): Unit = {
+        state.update(value.amount)
+    }
+}
+ 
+val bEnv = ExecutionEnvironment.getExecutionEnvironment()
+
+val accountDataSet = bEnv.fromCollection(accounts)
+
+val transformation = OperatorTransformation
+    .bootstrapWith(accountDataSet)
+    .keyBy(acc -> acc.id)
+    .transform(new AccountBootstrapper())
+{% endhighlight %}
+</div>
+</div>
+
+The 'KeyedStateBootstrapFunction` supports setting event time and processing time timers.
+The timers will not fire insde the bootstrap function and only become active once restored within a `DataStream` application.
+If a processing time timer is set but the state is not restored until after that time has passed, the timer will fire immediatly upon start.
+
+Once one or more transformations have been created they may be combined into a single `Savepoint`. 
+`Savepoint`'s are created using a state backend and max parallelism, they may contain any number of operators. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Savepoint
+    .create(backend, 128)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+Savepoint
+    .create(backend, 128)
+    .withOperator("uid1", transformation1)
+    .withOperator("uid2", transformation2)
+    .write(savepointPath)
+{% endhighlight %}
+</div>
+</div>
+		
+Besides creating a savepoint from scratch, you can base on off an existing savepoint such as when bootstrapping a single new operator for an existing job.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Savepoint
+    .load(backend, oldPath)
+    .withOperator("uid", transformation)
+    .write(newPath);
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+Savepoint
+    .load(backend, oldPath)
+    .withOperator("uid", transformation)
+    .write(newPath)
+{% endhighlight %}
+</div>
+</div>
+
+{% panel **Note:** When basing a new savepoint on existing state, the state processor api makes a shallow copy of the pointers to the existing operators. This means that both savepoints share state and one cannot be deleted without corrupting the other! %}