You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Ankur Chauhan <an...@malloc64.com> on 2018/04/13 02:48:09 UTC
[beam 2.4.0] [google dataflow service] Stateful DoFn with combining
state fails
Hi all,
I recently updated my dataflow pipeline to 2.4.0 sdk and found that my
stateful DoFn with the following statespec is throwing
java.lang.UnsupportedOperationException.
For reference the job information is:
- job-id: 2018-04-11_12_11_36-1181436984489583563
The same code seems to work correctly i.e. without problems in 2.3.0
@StateId("indexKeys")
// this is the state spec needed by beam to figure out the
state spec / type requirements at runtime
private final StateSpec<CombiningState<KV<String, KV<Long,
ByteString>>, Map<String, KV<Long, ByteString>>, Map<String,
ByteString>>> INDEX_KEYS_SPEC = StateSpecs.combining(new
IndexStateCombineFn());
The exception is:
java.lang.UnsupportedOperationException
java.util.AbstractMap.put(AbstractMap.java:209)
com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:22)
com.brightcove.rna.tools.index.IndexStateCombineFn.addInput(IndexStateCombineFn.java:11)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.add(WindmillStateInternals.java:920)
com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.emitIndexKeys(GenerateMutationsFn.java:195)
com.brightcove.rna.transforms.functions.GenerateMutationsFn$GenerateMutationSpeculatingFn.processElement(GenerateMutationsFn.java:160)
The combine fn is:
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.values.KV;
import java.util.Map;
// this combiner ensures that we keep track of the most value of each
key in the map
public class IndexStateCombineFn extends Combine.CombineFn<KV<String,
KV<Long, ByteString>>, Map<String, KV<Long, ByteString>>, Map<String,
ByteString>> {
@Override
public Map<String, KV<Long, ByteString>> createAccumulator() {
return Maps.newHashMap();
}
@Override
public Map<String, KV<Long, ByteString>> addInput(Map<String,
KV<Long, ByteString>> accumulator, KV<String, KV<Long, ByteString>>
input) {
String id = input.getKey();
KV<Long, ByteString> indexKey = input.getValue();
if (!accumulator.containsKey(id)) {
accumulator.put(id, indexKey);
} else {
KV<Long, ByteString> prevVal = accumulator.get(id);
if (prevVal == null || prevVal.getKey() <= indexKey.getKey()) {
// input is newer than what we have in the map, store it
accumulator.put(id, indexKey);
}
}
return accumulator;
}
@Override
public Map<String, KV<Long, ByteString>>
mergeAccumulators(Iterable<Map<String, KV<Long, ByteString>>>
accumulators) {
Map<String, KV<Long, ByteString>> merged = null;
for (Map<String, KV<Long, ByteString>> accumulator : accumulators) {
if (merged == null) {
merged = accumulator;
} else {
for (Map.Entry<String, KV<Long, ByteString>> entry :
accumulator.entrySet()) {
String indexId = entry.getKey();
KV<Long, ByteString> v = entry.getValue();
if (!merged.containsKey(indexId)) {
merged.put(indexId, v);
} else {
KV<Long, ByteString> old = merged.get(indexId);
if (old.getKey() < v.getKey()) {
merged.put(indexId, v);
}
}
}
}
}
return merged;
}
@Override
public Map<String, ByteString> extractOutput(Map<String, KV<Long,
ByteString>> accumulator) {
Map<String, ByteString> output =
Maps.newHashMapWithExpectedSize(accumulator.size());
for (Map.Entry<String, KV<Long, ByteString>> entry :
accumulator.entrySet()) {
output.put(entry.getKey(), entry.getValue().getValue());
}
return output;
}
}
The exception seems to point that WindmillStateInternals may be returning
an ImmutableMap but I can’t say for sure. Based on the javadoc for
addInput, the accumulator should be mutable.
Has anyone else seen this issue?
— Ankur Chauhan