You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ankur Chauhan <an...@malloc64.com> on 2018/04/13 02:48:09 UTC

[beam 2.4.0] [google dataflow service] Stateful DoFn with combining state fails

Hi all,

I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
stateful DoFn with the following statespec is throwing
java.lang.UnsupportedOperationException.

For reference the job information is:

   - job-id: 2018-04-11_12_11_36-1181436984489583563

The same code seems to work correctly i.e. without problems in 2.3.0

@StateId("indexKeys")
        // this is the state spec needed by beam to figure out the
state spec / type requirements at runtime
        private final StateSpec<CombiningState<KV<String, KV<Long,
ByteString>>, Map<String, KV<Long, ByteString>>, Map<String,
ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new
IndexStateCombineFn());

The exception is:

java.lang.UnsupportedOperationException
        java.util.AbstractMap.put(AbstractMap.java:209)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
        com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
        com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
        com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)


The combine fn is:

import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.values.KV;

import java.util.Map;

// this combiner ensures that we keep track of the most value of each
key in the map
public class IndexStateCombineFn extends Combine.CombineFn<KV<String,
KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String,
ByteString>> {
    @Override
    public Map<String, KV<Long, ByteString>> createAccumulator() {
        return Maps.newHashMap();
    }

    @Override
    public Map<String, KV<Long, ByteString>> addInput(Map<String,
KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>>
input) {
        String id = input.getKey();
        KV<Long, ByteString> indexKey = input.getValue();
        if (!accumulator.containsKey(id)) {
            accumulator.put(id, indexKey);
        } else {
            KV<Long, ByteString> prevVal = accumulator.get(id);
            if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
                // input is newer than what we have in the map, store it
                accumulator.put(id, indexKey);
            }
        }
        return accumulator;
    }

    @Override
    public Map<String, KV<Long, ByteString>>
mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>>
accumulators) {
        Map<String, KV<Long, ByteString>> merged = null;
        for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
            if (merged == null) {
                merged = accumulator;
            } else {
                for (Map.Entry<String, KV<Long, ByteString>> entry :
accumulator.entrySet()) {
                    String indexId = entry.getKey();
                    KV<Long, ByteString> v = entry.getValue();
                    if (!merged.containsKey(indexId)) {
                        merged.put(indexId, v);
                    } else {
                        KV<Long, ByteString> old = merged.get(indexId);
                        if (old.getKey() < v.getKey()) {
                            merged.put(indexId, v);
                        }
                    }
                }
            }
        }
        return merged;
    }

    @Override
    public Map<String, ByteString> extractOutput(Map<String, KV<Long,
ByteString>> accumulator) {
        Map<String, ByteString> output =
Maps.newHashMapWithExpectedSize(accumulator.size());
        for (Map.Entry<String, KV<Long, ByteString>> entry :
accumulator.entrySet()) {
            output.put(entry.getKey(), entry.getValue().getValue());
        }
        return output;
    }
}

The exception seems to point that WindmillStateInternals may be returning
an ImmutableMap but I can’t say for sure. Based on the javadoc for
addInput, the accumulator should be mutable.

Has anyone else seen this issue?

— Ankur Chauhan