You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Victor Kotai (JIRA)" <ji...@apache.org> on 2017/12/12 16:40:00 UTC
[jira] [Created] (BEAM-3334) NullPointerException in Direct Runner
Victor Kotai created BEAM-3334:
----------------------------------
Summary: NullPointerException in Direct Runner
Key: BEAM-3334
URL: https://issues.apache.org/jira/browse/BEAM-3334
Project: Beam
Issue Type: Bug
Components: runner-direct
Affects Versions: 2.2.0
Reporter: Victor Kotai
Assignee: Thomas Groh
We're currently migrating one of our pipelines from dataflow-sdk-1.9.1 to beam-2.2.0. To validate the behavior of the the flow, we've got an integration test (more than one) that reads a file containing TableRows in json representation, then runs the pipeline transformations on the resulting PCollection.
Example of the rows:
```
{"id":"firstId","type":"messageType","ts":"1500907784699", "message":"someMessage"}
{"id":"secondId","type":"messageType","ts":"1500907964217", "message":null}
```
In the test we've set up a pipeline to use the `DirectRunner`, the pipeline reads a PCollection from a file and does some operations.
The code to read from a file now looks like this:
```
public static PCollection<TableRow> readRowsFromFile(Pipeline pipeline, String filePath) {
return pipeline.apply(TextIO.read().from(Utils.absolutePathOf(filePath)))
.apply(ParDo.of(new ParseTableRowFromJson()));
}
```
Where `ParseTableRowFromJson` looks is:
```
public class ParseTableRowFromJson extends DoFn<String, TableRow> {
private static Logger LOG = LoggerFactory.getLogger(ParseTableRowFromJson.class);
@ProcessElement
public void processElement(ProcessContext processContext) {
String input = processContext.element();
try {
ByteArrayInputStream inStream = new ByteArrayInputStream(input.getBytes("UTF-8"));
processContext.output(TableRowJsonCoder.of().decode(inStream, OUTER));
} catch (IOException e) {
LOG.warn("Failed parsing tableRow json: {}", input);
}
}
}
```
or with a different implementation, that pretty much does the same thing:
```
private static final ObjectMapper MAPPER = (new ObjectMapper());
@ProcessElement
public void processElement(ProcessContext processContext) {
String input = processContext.element();
try {
processContext.output(MAPPER.readValue(input, TableRow.class));
} catch (IOException e) {
LOG.warn("Failed parsing tableRow json: {}", input);
}
}
```
When we run this on the given input we get the following:
```
Caused by: java.lang.NullPointerException
at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
at java.util.AbstractMap.hashCode(AbstractMap.java:530)
at java.util.Arrays.hashCode(Arrays.java:4146)
at java.util.Objects.hash(Objects.java:128)
at org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:284)
at java.util.HashMap.hash(HashMap.java:338)
at java.util.HashMap.get(HashMap.java:556)
at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:110)
at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
at ParseTableRowFromJson.processElement(ParseTableRowFromJson.java:22)
```
The NPE is due to a bad implementation of `hashCode` in `ArrayMap$Entry.hashCode` and it occurs on the TableRow that contains a `null` value for `message`.
I've PRed a change into the library that should fix this: [Issue here|https://github.com/google/google-http-java-client/issues/384]
I would guess now it's a matter of the library releasing and updating the BEAM dependency to it? Or is there anything else that should be done?
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)