You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by kl0u <gi...@git.apache.org> on 2017/01/16 13:18:29 UTC

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

GitHub user kl0u opened a pull request:

    https://github.com/apache/flink/pull/3130

    [FLINK-5502] Add the function migration guide in docs.

    This PR introduces the migration guide from Flink-1.1 to Flink-1.2 for user-defined functions.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kl0u/flink migration-doc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3130.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3130
    
----
commit 769430ca5a4187f534cd8e9fd50b259369884d5d
Author: kl0u <kk...@gmail.com>
Date:   2017-01-13T14:52:04Z

    [FLINK-5502] Add the function migration guide in docs.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96613305
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    --- End diff --
    
    This would be better as "namely **keyed** and **non-keyed** state" -- but also, do we want to agree on terminology? Stephan used "operator state" in the intro doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96614280
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    --- End diff --
    
    I suggest either
    
    > a grouped-by-key input stream
    
    or
    
    > a keyed input stream



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96629618
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of this section.
    +
    +##### CheckpointedFunction
    +
    +The `CheckpointedFunction` interface requires again the implementation of two methods:
    +
    +    void snapshotState(FunctionSnapshotContext context) throws Exception;
    +
    +    void initializeState(FunctionInitializationContext context) throws Exception;
    +
    +The difference here is that although, as in Flink-1.1, the `snapshotState()` is called when a checkpoint 
    +is performed, the `initializeState()` which is the counterpart of the `restoreState()`, is called every time 
    +the user-defined function is initialized and not only in the case that we are recovering from a failure. 
    +Given this, the `initializeState()` is the place where the different types of the state can be initialized, 
    +but also where the state recovery logic is included. The code for the `BufferingSink` implementing the 
    +`CheckpointedFunction` interface is presented below.
    --- End diff --
    
    As in Flink-1.1, `snapshotState()` is called whenever a checkpoint 
    is performed, but now `initializeState()` (which is the counterpart of the `restoreState()`) is called every time 
    the user-defined function is initialized, rather than only in the case that we are recovering from a failure. 
    Given this, `initializeState()` is not only the place where different types of state are initialized, 
    but also where state recovery logic is included. An implementation of the `CheckpointedFunction` interface for `BufferingSink` is presented below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96612039
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    --- End diff --
    
    This link can go to dev/stream/state.html


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3130: [FLINK-5502] Add the function migration guide in docs.

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3130
  
    @kl0u I reviewed and like your additions \U0001f44d 
    
    However, could you please have a look here: https://github.com/aljoscha/flink/tree/pr-3130-migration-documentation I'm suggesting a small reshuffling at the beginning and a rename of the captions. Also I changed Flink-1.1 and Flink-1.2 to Flink 1.1 and Flink 1.2, respectively because I think we never use the dash when referring to Flink versions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96628840
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of this section.
    --- End diff --
    
    ... will be explained at the end ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96630180
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of this section.
    +
    +##### CheckpointedFunction
    +
    +The `CheckpointedFunction` interface requires again the implementation of two methods:
    +
    +    void snapshotState(FunctionSnapshotContext context) throws Exception;
    +
    +    void initializeState(FunctionInitializationContext context) throws Exception;
    +
    +The difference here is that although, as in Flink-1.1, the `snapshotState()` is called when a checkpoint 
    +is performed, the `initializeState()` which is the counterpart of the `restoreState()`, is called every time 
    +the user-defined function is initialized and not only in the case that we are recovering from a failure. 
    +Given this, the `initializeState()` is the place where the different types of the state can be initialized, 
    +but also where the state recovery logic is included. The code for the `BufferingSink` implementing the 
    +`CheckpointedFunction` interface is presented below.
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    +            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threshold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSink(int threshold) {
    +            this.threshold = threshold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            bufferedElements.add(value);
    +            if (bufferedElements.size() == threshold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +            checkpointedState.clear();
    +            for (Tuple2<String, Integer> element : bufferedElements) {
    +                checkpointedState.add(element);
    +            }
    +        }
    +
    +        @Override
    +        public void initializeState(FunctionInitializationContext context) throws Exception {
    +            checkpointedState = context.getOperatorStateStore().
    +                getSerializableListState("buffered-elements");
    +
    +            if (context.isRestored()) {
    +                for (Tuple2<String, Integer> element : checkpointedState.get()) {
    +                    bufferedElements.add(element);
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize 
    +the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects 
    +are going to be stored upon checkpointing:
    +
    +`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
    +
    +After initializing the container, we check if we are recovering after failure using the `isRestored()` method 
    +of the context. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
    +
    +As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state 
    +initialization is kept in a class variable for future use in the `snapshotState()`. There, 
    +the `ListState` is initially cleared from all the objects included there from the previous checkpoint 
    +and then it is filled with the new ones we want to checkpoint.
    +
    --- End diff --
    
    There the `ListState` is cleared of all objects included by the previous checkpoint,
    and is then filled with the new ones we want to checkpoint.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96628699
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    --- End diff --
    
    snapshotState has a typo


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96628469
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    --- End diff --
    
    interface, or `ListCheckpointed<T extends Serializable>`,
    
    (drop the)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96628280
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    --- End diff --
    
    ... the rest of this section focuses on ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u closed the pull request at:

    https://github.com/apache/flink/pull/3130


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96612338
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    --- End diff --
    
    Perhaps it would be clearer to say "from a savepoint generated by its Flink-1.1 predecessor"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96630636
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    --- End diff --
    
    threshold is frequently misspelled in this example


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3130: [FLINK-5502] Add the function migration guide in docs.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3130
  
    Thanks @alpinegizmo for the review! I integrated your comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96613974
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    --- End diff --
    
    inconsistent indentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96630446
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of this section.
    +
    +##### CheckpointedFunction
    +
    +The `CheckpointedFunction` interface requires again the implementation of two methods:
    +
    +    void snapshotState(FunctionSnapshotContext context) throws Exception;
    +
    +    void initializeState(FunctionInitializationContext context) throws Exception;
    +
    +The difference here is that although, as in Flink-1.1, the `snapshotState()` is called when a checkpoint 
    +is performed, the `initializeState()` which is the counterpart of the `restoreState()`, is called every time 
    +the user-defined function is initialized and not only in the case that we are recovering from a failure. 
    +Given this, the `initializeState()` is the place where the different types of the state can be initialized, 
    +but also where the state recovery logic is included. The code for the `BufferingSink` implementing the 
    +`CheckpointedFunction` interface is presented below.
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    +            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threshold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSink(int threshold) {
    +            this.threshold = threshold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            bufferedElements.add(value);
    +            if (bufferedElements.size() == threshold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +            checkpointedState.clear();
    +            for (Tuple2<String, Integer> element : bufferedElements) {
    +                checkpointedState.add(element);
    +            }
    +        }
    +
    +        @Override
    +        public void initializeState(FunctionInitializationContext context) throws Exception {
    +            checkpointedState = context.getOperatorStateStore().
    +                getSerializableListState("buffered-elements");
    +
    +            if (context.isRestored()) {
    +                for (Tuple2<String, Integer> element : checkpointedState.get()) {
    +                    bufferedElements.add(element);
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize 
    +the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects 
    +are going to be stored upon checkpointing:
    +
    +`this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");`
    +
    +After initializing the container, we check if we are recovering after failure using the `isRestored()` method 
    +of the context. If this is `true`, *i.e.* we are recovering, the restore logic is applied.
    +
    +As shown in the code of the modified `BufferingSink`, this `ListState` recovered during state 
    +initialization is kept in a class variable for future use in the `snapshotState()`. There, 
    +the `ListState` is initially cleared from all the objects included there from the previous checkpoint 
    +and then it is filled with the new ones we want to checkpoint.
    +
    +As a side note, the keyed state can also be initialized in the `initializeState()` method. This can be done 
    +using the `FunctionInitializationContext` given as argument, instead of the `RuntimeContext`, which is the case
    +for Flink-1.1. If the `CheckpointedFunction` interface was to be used in the case of the `CountMapper`, 
    +the old `open()` method could be removed and the new `snaphshotState()` and `initializeState()` methods 
    +would look like the following:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> 
    +            implements CheckpointedFunction {
    +
    +        private transient ValueState<Integer> counter;
    +
    +        private final int numberElements;
    +
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +            counter.update(count);
    +              
    +            if (count % numberElements == 0) {
    +                out.collect(Tuple2.of(value.f0, count));
    +             	counter.update(0); // reset to 0
    +             	}
    +            }
    +        }
    +
    +        @Override
    +        public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +            //all managed, nothing to do.
    +        }
    +
    +        @Override
    +        public void initializeState(FunctionInitializationContext context) throws Exception {
    +            counter = context.getKeyedStateStore().getState(
    +                new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +    }
    +
    +Notice that the `snaphotState()` method is empty as Flink itself takes care of snapshotting managed keyed state
    +upon checkpointing.
    +
    +#### Backwards compatibility with Flink-1.1
    +
    +So far we have seen how to modify our functions to take advantage of the new features introduced be Flink-1.2. 
    +The question that remains is "Can I make sure that my modified (Flink-1.2) job will start from where my already 
    +running job from Flink-1.1 stopped?".
    +
    +The answer is yes and the way to do it is pretty straight-forward. For the keyed state, you have to do nothing.
    --- End diff --
    
    straightforward is one word, no hyphen


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3130: [FLINK-5502] Add the function migration guide in d...

Posted by alpinegizmo <gi...@git.apache.org>.
Github user alpinegizmo commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3130#discussion_r96629758
  
    --- Diff: docs/dev/migration.md ---
    @@ -25,9 +25,326 @@ under the License.
     * This will be replaced by the TOC
     {:toc}
     
    -## Flink 1.1 to 1.2
    +## Flink Function Migration from 1.1 to 1.2
     
    -### State API
    +### Introduction
     
    -### Fast Processing Time Window Operators
    +As mentioned [here](link here) Flink has two types of state, namely the **keyed** and the **non-keyed** one.
    +Both types are available to user-defined functions and this document will guide you through the process 
    +of migrating your Flink-1.1 function code to Flink-1.2. 
     
    +The migration process will serve two goals:
    +
    +1. allow your functions to take advantage of the new features introduced in Flink-1.2, such as rescaling,
    +
    +2. make sure that your new Flink-1.2 job will be able to continue from where its Flink-1.1 predecessor stopped.
    +
    +As running examples for the remainder of this document we will use the `CountMapper` and the `BufferingSink`
    +functions. The first is an example of a function with **keyed** state, while 
    +the second has **non-keyed** state. The code for the aforementioned two functions in Flink-1.1 is presented below:
    +
    +    public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    +    
    +        private transient ValueState<Integer> counter;
    +      
    +        private final int numberElements;
    +      
    +        public CountMapper(int numberElements) {
    +            this.numberElements = numberElements;
    +        }
    +      
    +        @Override
    +        public void open(Configuration parameters) throws Exception {
    +            counter = getRuntimeContext().getState(
    +      	        new ValueStateDescriptor<>("counter", Integer.class, 0));
    +        }
    +      
    +        @Override
    +        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {
    +            int count = counter.value() + 1;
    +      	    counter.update(count);
    +      
    +      	    if (count % numberElements == 0) {
    +     		    out.collect(Tuple2.of(value.f0, count));
    +     		    counter.update(0); // reset to 0
    +     	    }
    +        }
    +    }
    +
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, 
    +            Checkpointed<ArrayList<Tuple2<String, Integer>>> {
    +
    +	    private final int threshold;
    +
    +	    private ArrayList<Tuple2<String, Integer>> bufferedElements;
    +
    +	    BufferingSink(int threshold) {
    +		    this.threshold = threshold;
    +		    this.bufferedElements = new ArrayList<>();
    +	    }
    +
    +    	@Override
    +	    public void invoke(Tuple2<String, Integer> value) throws Exception {
    +		    bufferedElements.add(value);
    +		    if (bufferedElements.size() == threshold) {
    +			    for (Tuple2<String, Integer> element: bufferedElements) {
    +				    // send it to the sink
    +			    }
    +			    bufferedElements.clear();
    +		    }
    +	    }
    +
    +	    @Override
    +	    public ArrayList<Tuple2<String, Integer>> snapshotState(
    +	            long checkpointId, long checkpointTimestamp) throws Exception {
    +		    return bufferedElements;
    +	    }
    +
    +	    @Override
    +	    public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +	    	bufferedElements.addAll(state);
    +        }
    +    }
    +
    +
    +The `CountMapper` is a `RichFlatMapFuction` which assumes a grouped by key input stream of the form 
    +`(word, 1)`. The function keeps a counter for each incoming key (`ValueState<Integer> counter`) and if 
    +the number of occurrences of a certain word surpasses the user-provided threshold, a tuple is emitted 
    +containing the word itself and the number of occurrences.
    +
    +The `BufferingSink` is a `SinkFunction` that receives elements (potentially the output of the `CountMapper`)
    +and buffers them until a certain user-specified threshold is reached, before emitting them to the final sink. 
    +This is a common way to avoid many expensive calls to a database or an external storage system. To do the 
    +buffering in a fault-tolerant manner, the buffered elements are kept in a list (`bufferedElements`) which is 
    +periodically checkpointed.
    +
    +### Migration to Flink-1.2
    +
    +To leverage the new features of Flink-1.2, the code above should be modified to use the new state abstractions. 
    +After doing these changes, you will be able to change the parallelism of your job (scale up or down) and you 
    +are guaranteed that the new version of your job will start from where its predecessor left off.
    +
    +**Keyed State:** Something to note before delving into the details of the migration process is that if your function 
    +has **only keyed state**, then the exact same code from Flink-1.1 also works for Flink-1.2 with full support 
    +for the new features and full backwards compatibility. Changes could be made just for better code organization, 
    +but this is just a matter of style.
    +
    +With the above said, the rest of the paragraph focuses on the **non-keyed state**.
    +
    +#### Rescaling and new state abstractions
    +
    +The first modification is the transition from the old `Checkpointed<T extends Serializable>` state interface 
    +to the new ones. In Flink-1.2, a stateful function can implement either the more general `CheckpointedFunction` 
    +interface, or the `ListCheckpointed<T extends Serializable>`, which is semantically closer to the old 
    +`Checkpointed` one.
    +
    +In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, 
    +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which 
    +non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink`
    +contains elements `(test1, 2)` and `(test2, 2)` when increasing the parallelism to 2, `(test1, 2)` may end up in task 0,
    +while `(test2, 2)` will go to task 1. 
    +
    +More details on the principles behind rescaling of both keyed state and non-keyed state is done can be found 
    +[here](link here).
    +
    +##### ListCheckpointed
    +
    +The `ListCheckpointed` interface requires the implementation of two methods: 
    +
    +    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    +
    +    void restoreState(List<T> state) throws Exception;
    +
    +Their semantics are the same as their counterparts in the old `Checkpointed` interface. The only difference 
    +is that now the `snapshotState()` should return a list of objects to checkpoint, as stated earlier, and the 
    +`restoreState` has to handle this list upon recovery. If the state is not re-partitionable, you can always 
    +return a `Collections.singletonList(MY_STATE)` in the `snaphshotState()`. The updated code for the `BufferingSink` 
    +is included below:
    +
    +    public class BufferingSinkListCheckpointed implements 
    +            SinkFunction<Tuple2<String, Integer>>,
    +            ListCheckpointed<Tuple2<String, Integer>>, 
    +            CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threashold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSinkListCheckpointed(int threashold) {
    +            this.threashold = threashold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            this.bufferedElements.add(value);
    +            if (bufferedElements.size() == threashold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public List<Tuple2<String, Integer>> snapshotState(
    +                long checkpointId, long timestamp) throws Exception {
    +            return this.bufferedElements;
    +        }
    +
    +        @Override
    +        public void restoreState(List<Tuple2<String, Integer>> state) throws Exception {
    +            if (!state.isEmpty()) {
    +                this.bufferedElements.addAll(state);
    +            }
    +        }
    +        
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +As shown in the code, the updated function also implements the `CheckpointedRestoring` interface. This is for backwards 
    +compatibility reasons and more details will be explained in the end of this section.
    +
    +##### CheckpointedFunction
    +
    +The `CheckpointedFunction` interface requires again the implementation of two methods:
    +
    +    void snapshotState(FunctionSnapshotContext context) throws Exception;
    +
    +    void initializeState(FunctionInitializationContext context) throws Exception;
    +
    +The difference here is that although, as in Flink-1.1, the `snapshotState()` is called when a checkpoint 
    +is performed, the `initializeState()` which is the counterpart of the `restoreState()`, is called every time 
    +the user-defined function is initialized and not only in the case that we are recovering from a failure. 
    +Given this, the `initializeState()` is the place where the different types of the state can be initialized, 
    +but also where the state recovery logic is included. The code for the `BufferingSink` implementing the 
    +`CheckpointedFunction` interface is presented below.
    +
    +    public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,
    +            CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {
    +
    +        private final int threshold;
    +
    +        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    +
    +        private List<Tuple2<String, Integer>> bufferedElements;
    +
    +        public BufferingSink(int threshold) {
    +            this.threshold = threshold;
    +            this.bufferedElements = new ArrayList<>();
    +        }
    +
    +        @Override
    +        public void invoke(Tuple2<String, Integer> value) throws Exception {
    +            bufferedElements.add(value);
    +            if (bufferedElements.size() == threshold) {
    +                for (Tuple2<String, Integer> element: bufferedElements) {
    +                    // send it to the sink
    +                }
    +                bufferedElements.clear();
    +            }
    +        }
    +
    +        @Override
    +        public void snapshotState(FunctionSnapshotContext context) throws Exception {
    +            checkpointedState.clear();
    +            for (Tuple2<String, Integer> element : bufferedElements) {
    +                checkpointedState.add(element);
    +            }
    +        }
    +
    +        @Override
    +        public void initializeState(FunctionInitializationContext context) throws Exception {
    +            checkpointedState = context.getOperatorStateStore().
    +                getSerializableListState("buffered-elements");
    +
    +            if (context.isRestored()) {
    +                for (Tuple2<String, Integer> element : checkpointedState.get()) {
    +                    bufferedElements.add(element);
    +                }
    +            }
    +        }
    +
    +        @Override
    +        public void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {
    +            // this is from the CheckpointedRestoring interface.
    +            this.bufferedElements.addAll(state);
    +        }
    +    }
    +
    +The `initializeState` takes as argument a `FunctionInitializationContext`. This is used to initialize 
    +the non-keyed state "container". This is a container of type `ListState` where the non-keyed state objects 
    +are going to be stored upon checkpointing:
    --- End diff --
    
    This is a container of type `ListState` where non-keyed state objects are stored upon checkpointing:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3130: [FLINK-5502] Add the function migration guide in docs.

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/3130
  
    Thanks  @aljoscha . I checked the changes and they look good!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3130: [FLINK-5502] Add the function migration guide in docs.

Posted by aljoscha <gi...@git.apache.org>.
Github user aljoscha commented on the issue:

    https://github.com/apache/flink/pull/3130
  
    Thanks! \U0001f604 I merged your changes, could you please close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---