You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Dmitry Minaev <mi...@gmail.com> on 2018/11/12 18:43:15 UTC

ValueState for Dataflow runner and MapState for others

Hi everyone,

Since Dataflow doesn't support MapState (
https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
ValueState with a Map<> inside it. Is it a good idea? Here is an example
code:
```
@StateId("myValueStore")
private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec =
StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));

@ProcessElement
public void processElement( ProcessContext
context, @StateId("myValueStore") MapState<String, String> valueStore) {
    ...
}
```

I'd like to support other runners as well (e.g. FlinkRunner) and it seems
to be more efficient to use MapState in cases where I need to store a map
of values. So I'm thinking of the way to use MapState and ValueState for
different runners.

I understand how to get a runner in runtime via pipeline options but I'm
not sure how to approach defining (and using) different StateSpec for
different runners.

Here is a sample code for MapState:
```
@StateId("myMapStore")
private final StateSpec<MapState<String, String>> mapStoreSpec =
StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());

@ProcessElement
public void processElement( ProcessContext context,
        @StateId("myMapStore") MapState<String, String> mapStore) {
    ...
}
```

Any ideas?

Thank you,
Dmitry

Re: ValueState for Dataflow runner and MapState for others

Posted by Lukasz Cwik <lc...@google.com>.
Depends on how complex the DoFn is, but you should be able to share parts
of the implementation in a static method that both implementations invoke.

On Mon, Nov 12, 2018 at 4:59 PM Dmitry Minaev <mi...@gmail.com> wrote:

> Yes, sure, that'll work, I will just have to support 2 different
> implementations. I was hoping there is something more elegant.
> Thank you Lukasz, I appreciate the response!
>
> --
> Dmitry
>
> On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Could you write two different implementations of the DoFn and put your
>> processing logic in another function that both DoFn's would invoke after
>> doing any accessing of the state?
>>
>> Then during pipeline construction you could choose to apply the Map one
>> or the Value one based upon which runner your using.
>>
>>
>>
>> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <mi...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> Since Dataflow doesn't support MapState (
>>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
>>> ValueState with a Map<> inside it. Is it a good idea? Here is an example
>>> code:
>>> ```
>>> @StateId("myValueStore")
>>> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec
>>> = StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext
>>> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>>>     ...
>>> }
>>> ```
>>>
>>> I'd like to support other runners as well (e.g. FlinkRunner) and it
>>> seems to be more efficient to use MapState in cases where I need to store
>>> a map of values. So I'm thinking of the way to use MapState and ValueState
>>> for different runners.
>>>
>>> I understand how to get a runner in runtime via pipeline options but I'm
>>> not sure how to approach defining (and using) different StateSpec for
>>> different runners.
>>>
>>> Here is a sample code for MapState:
>>> ```
>>> @StateId("myMapStore")
>>> private final StateSpec<MapState<String, String>> mapStoreSpec =
>>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext context,
>>>         @StateId("myMapStore") MapState<String, String> mapStore) {
>>>     ...
>>> }
>>> ```
>>>
>>> Any ideas?
>>>
>>> Thank you,
>>> Dmitry
>>>
>>

Re: ValueState for Dataflow runner and MapState for others

Posted by Dmitry Minaev <mi...@gmail.com>.
Yes, sure, that'll work, I will just have to support 2 different
implementations. I was hoping there is something more elegant.
Thank you Lukasz, I appreciate the response!

--
Dmitry

On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik <lc...@google.com> wrote:

> Could you write two different implementations of the DoFn and put your
> processing logic in another function that both DoFn's would invoke after
> doing any accessing of the state?
>
> Then during pipeline construction you could choose to apply the Map one or
> the Value one based upon which runner your using.
>
>
>
> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <mi...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Since Dataflow doesn't support MapState (
>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
>> ValueState with a Map<> inside it. Is it a good idea? Here is an example
>> code:
>> ```
>> @StateId("myValueStore")
>> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec =
>> StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>>
>> @ProcessElement
>> public void processElement( ProcessContext
>> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>>     ...
>> }
>> ```
>>
>> I'd like to support other runners as well (e.g. FlinkRunner) and it seems
>> to be more efficient to use MapState in cases where I need to store a map
>> of values. So I'm thinking of the way to use MapState and ValueState for
>> different runners.
>>
>> I understand how to get a runner in runtime via pipeline options but I'm
>> not sure how to approach defining (and using) different StateSpec for
>> different runners.
>>
>> Here is a sample code for MapState:
>> ```
>> @StateId("myMapStore")
>> private final StateSpec<MapState<String, String>> mapStoreSpec =
>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>>
>> @ProcessElement
>> public void processElement( ProcessContext context,
>>         @StateId("myMapStore") MapState<String, String> mapStore) {
>>     ...
>> }
>> ```
>>
>> Any ideas?
>>
>> Thank you,
>> Dmitry
>>
>

Re: ValueState for Dataflow runner and MapState for others

Posted by Lukasz Cwik <lc...@google.com>.
Could you write two different implementations of the DoFn and put your
processing logic in another function that both DoFn's would invoke after
doing any accessing of the state?

Then during pipeline construction you could choose to apply the Map one or
the Value one based upon which runner your using.



On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <mi...@gmail.com> wrote:

> Hi everyone,
>
> Since Dataflow doesn't support MapState (
> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
> ValueState with a Map<> inside it. Is it a good idea? Here is an example
> code:
> ```
> @StateId("myValueStore")
> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec =
> StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>
> @ProcessElement
> public void processElement( ProcessContext
> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>     ...
> }
> ```
>
> I'd like to support other runners as well (e.g. FlinkRunner) and it seems
> to be more efficient to use MapState in cases where I need to store a map
> of values. So I'm thinking of the way to use MapState and ValueState for
> different runners.
>
> I understand how to get a runner in runtime via pipeline options but I'm
> not sure how to approach defining (and using) different StateSpec for
> different runners.
>
> Here is a sample code for MapState:
> ```
> @StateId("myMapStore")
> private final StateSpec<MapState<String, String>> mapStoreSpec =
> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>
> @ProcessElement
> public void processElement( ProcessContext context,
>         @StateId("myMapStore") MapState<String, String> mapStore) {
>     ...
> }
> ```
>
> Any ideas?
>
> Thank you,
> Dmitry
>