You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Kenneth Knowles <kl...@google.com> on 2018/04/16 22:33:13 UTC
Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState
There's actually been a rename since that notion of CombiningState. It used
to be BagState (just blind writes) and CombiningValueState (uses a
CombineFn) were both instances of CombiningState (any automatically
mergeable thing).
Now the names are BagState (blind writes) and CombiningState (uses a
CombineFn) which are instances of GroupingState (automatically mergeable -
you might wonder why we didn't call it MergeableState...)
Kenn
On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:
> Out of curiosity, what are you using CombiningState for? I believe it is
> intended for use in merging windows (such as sessions), however those
> windows are not yet supported with state.
>
> Reuven
>
> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <ac...@brightcove.com>
> wrote:
>
>> Hi all,
>>
>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>> stateful DoFn with the following statespec is throwing
>> java.lang.UnsupportedOperationException.
>>
>> For reference the job information is:
>>
>> - job-id: 2018-04-11_12_11_36-1181436984489583563
>>
>> The same code seems to work correctly i.e. without problems in 2.3.0
>>
>> @StateId("indexKeys")
>> // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
>> private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>
>> The exception is:
>>
>> java.lang.UnsupportedOperationException
>> java.util.AbstractMap.put(AbstractMap.java:209)
>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>
>> The combine fn is:
>>
>>
>> import com.google.common.collect.Maps;
>> import com.google.protobuf.ByteString;
>> import org.apache.beam.sdk.transforms.Combine;
>> import org.apache.beam.sdk.values.KV;
>>
>> import java.util.Map;
>>
>> // this combiner ensures that we keep track of the most value of each key in the map
>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
>> @Override
>> public Map<String, KV<Long, ByteString>> createAccumulator() {
>> return Maps.newHashMap();
>> }
>>
>> @Override
>> public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>> String id = input.getKey();
>> KV<Long, ByteString> indexKey = input.getValue();
>> if (!accumulator.containsKey(id)) {
>> accumulator.put(id, indexKey);
>> } else {
>> KV<Long, ByteString> prevVal = accumulator.get(id);
>> if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>> // input is newer than what we have in the map, store it
>> accumulator.put(id, indexKey);
>> }
>> }
>> return accumulator;
>> }
>>
>> @Override
>> public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
>> Map<String, KV<Long, ByteString>> merged = null;
>> for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>> if (merged == null) {
>> merged = accumulator;
>> } else {
>> for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>> String indexId = entry.getKey();
>> KV<Long, ByteString> v = entry.getValue();
>> if (!merged.containsKey(indexId)) {
>> merged.put(indexId, v);
>> } else {
>> KV<Long, ByteString> old = merged.get(indexId);
>> if (old.getKey() < v.getKey()) {
>> merged.put(indexId, v);
>> }
>> }
>> }
>> }
>> }
>> return merged;
>> }
>>
>> @Override
>> public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
>> Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
>> for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>> output.put(entry.getKey(), entry.getValue().getValue());
>> }
>> return output;
>> }
>> }
>>
>> The exception seems to point that WindmillStateInternals may be
>> returning an ImmutableMap but I can’t say for sure. Based on the javadoc
>> for addInput, the accumulator should be mutable.
>>
>> Has anyone else seen this issue?
>>
>> — Ankur Chauhan
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
> --
> You received this message because you are subscribed to the Google Groups
> "dataflow-feedback" group.
> To view this discussion on the web visit
> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
> .
>
Re: [Google dataflow] Apache beam 2.4.0 causes exceptions with CombiningState
Posted by Kenneth Knowles <kl...@google.com>.
If you are mutating accumulators, perhaps you might blind write the inputs
and have the system manage the combining. I'd have to see the body of
@ProcessElement to say more.
Kenn
On Mon, Apr 16, 2018 at 3:33 PM Kenneth Knowles <kl...@google.com> wrote:
> There's actually been a rename since that notion of CombiningState. It
> used to be BagState (just blind writes) and CombiningValueState (uses a
> CombineFn) were both instances of CombiningState (any automatically
> mergeable thing).
>
> Now the names are BagState (blind writes) and CombiningState (uses a
> CombineFn) which are instances of GroupingState (automatically mergeable -
> you might wonder why we didn't call it MergeableState...)
>
> Kenn
>
> On Mon, Apr 16, 2018 at 12:14 PM Reuven Lax <re...@google.com> wrote:
>
>> Out of curiosity, what are you using CombiningState for? I believe it is
>> intended for use in merging windows (such as sessions), however those
>> windows are not yet supported with state.
>>
>> Reuven
>>
>> On Fri, Apr 13, 2018 at 2:42 AM Ankur Chauhan <ac...@brightcove.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
>>> stateful DoFn with the following statespec is throwing
>>> java.lang.UnsupportedOperationException.
>>>
>>> For reference the job information is:
>>>
>>> - job-id: 2018-04-11_12_11_36-1181436984489583563
>>>
>>> The same code seems to work correctly i.e. without problems in 2.3.0
>>>
>>> @StateId("indexKeys")
>>> // this is the state spec needed by beam to figure out the state spec / type requirements at runtime
>>> private final StateSpec<CombiningState<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new IndexStateCombineFn());
>>>
>>> The exception is:
>>>
>>> java.lang.UnsupportedOperationException
>>> java.util.AbstractMap.put(AbstractMap.java:209)
>>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
>>> com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
>>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
>>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
>>> com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
>>>
>>> The combine fn is:
>>>
>>>
>>> import com.google.common.collect.Maps;
>>> import com.google.protobuf.ByteString;
>>> import org.apache.beam.sdk.transforms.Combine;
>>> import org.apache.beam.sdk.values.KV;
>>>
>>> import java.util.Map;
>>>
>>> // this combiner ensures that we keep track of the most value of each key in the map
>>> public class IndexStateCombineFn extends Combine.CombineFn<KV<String, KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String, ByteString>> {
>>> @Override
>>> public Map<String, KV<Long, ByteString>> createAccumulator() {
>>> return Maps.newHashMap();
>>> }
>>>
>>> @Override
>>> public Map<String, KV<Long, ByteString>> addInput(Map<String, KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>> input) {
>>> String id = input.getKey();
>>> KV<Long, ByteString> indexKey = input.getValue();
>>> if (!accumulator.containsKey(id)) {
>>> accumulator.put(id, indexKey);
>>> } else {
>>> KV<Long, ByteString> prevVal = accumulator.get(id);
>>> if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
>>> // input is newer than what we have in the map, store it
>>> accumulator.put(id, indexKey);
>>> }
>>> }
>>> return accumulator;
>>> }
>>>
>>> @Override
>>> public Map<String, KV<Long, ByteString>> mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>> accumulators) {
>>> Map<String, KV<Long, ByteString>> merged = null;
>>> for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
>>> if (merged == null) {
>>> merged = accumulator;
>>> } else {
>>> for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>> String indexId = entry.getKey();
>>> KV<Long, ByteString> v = entry.getValue();
>>> if (!merged.containsKey(indexId)) {
>>> merged.put(indexId, v);
>>> } else {
>>> KV<Long, ByteString> old = merged.get(indexId);
>>> if (old.getKey() < v.getKey()) {
>>> merged.put(indexId, v);
>>> }
>>> }
>>> }
>>> }
>>> }
>>> return merged;
>>> }
>>>
>>> @Override
>>> public Map<String, ByteString> extractOutput(Map<String, KV<Long, ByteString>> accumulator) {
>>> Map<String, ByteString> output = Maps.newHashMapWithExpectedSize(accumulator.size());
>>> for (Map.Entry<String, KV<Long, ByteString>> entry : accumulator.entrySet()) {
>>> output.put(entry.getKey(), entry.getValue().getValue());
>>> }
>>> return output;
>>> }
>>> }
>>>
>>> The exception seems to point that WindmillStateInternals may be
>>> returning an ImmutableMap but I can’t say for sure. Based on the
>>> javadoc for addInput, the accumulator should be mutable.
>>>
>>> Has anyone else seen this issue?
>>>
>>> — Ankur Chauhan
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "dataflow-feedback" group.
>>> To view this discussion on the web visit
>>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com
>>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CAEFbgUQau3VZK3k6ss24KOu5Owim8zAmRQfXScrhC0KjzpL%2Bbg%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "dataflow-feedback" group.
>> To view this discussion on the web visit
>> https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com
>> <https://groups.google.com/a/google.com/d/msgid/dataflow-feedback/CALsTK6%2BzmyA88OMT0QgBhoMk%2BnP1engXabhRw_pCX4xyCCM5QA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>