You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2018/04/16 22:33:13 UTC

Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState

There's actually been a rename since that notion of CombiningState. It used
to be BagState (just blind writes) and CombiningValueState (uses a
CombineFn) were both instances of CombiningState (any automatically
mergeable thing).

Now the names are BagState (blind writes) and CombiningState (uses a
CombineFn) which are instances of GroupingState (automatically mergeable -
you might wonder why we didn't call it MergeableState...)

Kenn

On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:

> Out of curiosity, what are you using CombiningState for? I believe it is
> intended for use in merging windows (such as sessions), however those
> windows are not yet supported with state.
>
> Reuven
>
> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <ac...@brightcove.com>
> wrote:
>
>> Hi all,
>>
>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>> stateful DoFn with the following statespec is throwing
>> java.lang.UnsupportedOperationException.
>>
>> For reference the job information is:
>>
>>    - job-id: 2018-04-11_12_11_36-1181436984489583563
>>
>> The same code seems to work correctly i.e. without problems in 2.3.0
>>
>> @StateId("indexKeys")
>>         // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
>>         private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>
>> The exception is:
>>
>> java.lang.UnsupportedOperationException
>>         java.util.AbstractMap.put(AbstractMap.java:209)
>>         com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>>         com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>>         com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>>         com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>>         com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>
>> The combine fn is:
>>
>>
>> import com.google.common.collect.Maps;
>> import com.google.protobuf.ByteString;
>> import org.apache.beam.sdk.transforms.Combine;
>> import org.apache.beam.sdk.values.KV;
>>
>> import java.util.Map;
>>
>> // this combiner ensures that we keep track of the most value of each key in the map
>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
>>     @Override
>>     public Map<String, KV<Long, ByteString>> createAccumulator() {
>>         return Maps.newHashMap();
>>     }
>>
>>     @Override
>>     public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>>         String id = input.getKey();
>>         KV<Long, ByteString> indexKey = input.getValue();
>>         if (!accumulator.containsKey(id)) {
>>             accumulator.put(id, indexKey);
>>         } else {
>>             KV<Long, ByteString> prevVal = accumulator.get(id);
>>             if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>>                 // input is newer than what we have in the map, store it
>>                 accumulator.put(id, indexKey);
>>             }
>>         }
>>         return accumulator;
>>     }
>>
>>     @Override
>>     public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
>>         Map<String, KV<Long, ByteString>> merged = null;
>>         for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>>             if (merged == null) {
>>                 merged = accumulator;
>>             } else {
>>                 for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>                     String indexId = entry.getKey();
>>                     KV<Long, ByteString> v = entry.getValue();
>>                     if (!merged.containsKey(indexId)) {
>>                         merged.put(indexId, v);
>>                     } else {
>>                         KV<Long, ByteString> old = merged.get(indexId);
>>                         if (old.getKey() < v.getKey()) {
>>                             merged.put(indexId, v);
>>                         }
>>                     }
>>                 }
>>             }
>>         }
>>         return merged;
>>     }
>>
>>     @Override
>>     public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
>>         Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
>>         for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>             output.put(entry.getKey(), entry.getValue().getValue());
>>         }
>>         return output;
>>     }
>> }
>>
>> The exception seems to point that WindmillStateInternals may be
>> returning an ImmutableMap but I can’t say for sure. Based on the javadoc
>> for addInput, the accumulator should be mutable.
>>
>> Has anyone else seen this issue?
>>
>> — Ankur Chauhan
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
> --
> You received this message because you are subscribed to the Google Groups
> "dataflow-feedback" group.
> To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>

Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState

Posted by Kenneth Knowles <kl...@google.com>.
If you are mutating accumulators, perhaps you might blind write the inputs
and have the system manage the combining. I'd have to see the body of
@ProcessElement to say more.

Kenn

On Mon, Apr 16, 2018 at 3:33 PM Kenneth Knowles <kl...@google.com> wrote:

> There's actually been a rename since that notion of CombiningState. It
> used to be BagState (just blind writes) and CombiningValueState (uses a
> CombineFn) were both instances of CombiningState (any automatically
> mergeable thing).
>
> Now the names are BagState (blind writes) and CombiningState (uses a
> CombineFn) which are instances of GroupingState (automatically mergeable -
> you might wonder why we didn't call it MergeableState...)
>
> Kenn
>
> On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:
>
>> Out of curiosity, what are you using CombiningState for? I believe it is
>> intended for use in merging windows (such as sessions), however those
>> windows are not yet supported with state.
>>
>> Reuven
>>
>> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <ac...@brightcove.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>>> stateful DoFn with the following statespec is throwing
>>> java.lang.UnsupportedOperationException.
>>>
>>> For reference the job information is:
>>>
>>>    - job-id: 2018-04-11_12_11_36-1181436984489583563
>>>
>>> The same code seems to work correctly i.e. without problems in 2.3.0
>>>
>>> @StateId("indexKeys")
>>>         // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
>>>         private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>>
>>> The exception is:
>>>
>>> java.lang.UnsupportedOperationException
>>>         java.util.AbstractMap.put(AbstractMap.java:209)
>>>         com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>>>         com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>>>         com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>>>         com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>>>         com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>>
>>> The combine fn is:
>>>
>>>
>>> import com.google.common.collect.Maps;
>>> import com.google.protobuf.ByteString;
>>> import org.apache.beam.sdk.transforms.Combine;
>>> import org.apache.beam.sdk.values.KV;
>>>
>>> import java.util.Map;
>>>
>>> // this combiner ensures that we keep track of the most value of each key in the map
>>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> createAccumulator() {
>>>         return Maps.newHashMap();
>>>     }
>>>
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>>>         String id = input.getKey();
>>>         KV<Long, ByteString> indexKey = input.getValue();
>>>         if (!accumulator.containsKey(id)) {
>>>             accumulator.put(id, indexKey);
>>>         } else {
>>>             KV<Long, ByteString> prevVal = accumulator.get(id);
>>>             if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>>>                 // input is newer than what we have in the map, store it
>>>                 accumulator.put(id, indexKey);
>>>             }
>>>         }
>>>         return accumulator;
>>>     }
>>>
>>>     @Override
>>>     public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
>>>         Map<String, KV<Long, ByteString>> merged = null;
>>>         for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>>>             if (merged == null) {
>>>                 merged = accumulator;
>>>             } else {
>>>                 for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>>                     String indexId = entry.getKey();
>>>                     KV<Long, ByteString> v = entry.getValue();
>>>                     if (!merged.containsKey(indexId)) {
>>>                         merged.put(indexId, v);
>>>                     } else {
>>>                         KV<Long, ByteString> old = merged.get(indexId);
>>>                         if (old.getKey() < v.getKey()) {
>>>                             merged.put(indexId, v);
>>>                         }
>>>                     }
>>>                 }
>>>             }
>>>         }
>>>         return merged;
>>>     }
>>>
>>>     @Override
>>>     public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
>>>         Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
>>>         for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>>             output.put(entry.getKey(), entry.getValue().getValue());
>>>         }
>>>         return output;
>>>     }
>>> }
>>>
>>> The exception seems to point that WindmillStateInternals may be
>>> returning an ImmutableMap but I can’t say for sure. Based on the
>>> javadoc for addInput, the accumulator should be mutable.
>>>
>>> Has anyone else seen this issue?
>>>
>>> — Ankur Chauhan
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "dataflow-feedback" group.
>>> To view this discussion on the web visit
>>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>