You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Victor Kotai (JIRA)" <ji...@apache.org> on 2017/12/12 16:40:00 UTC

[jira] [Created] (BEAM-3334) NullPointerException in Direct Runner

Victor Kotai created BEAM-3334:
----------------------------------

             Summary: NullPointerException in Direct Runner
                 Key: BEAM-3334
                 URL: https://issues.apache.org/jira/browse/BEAM-3334
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
    Affects Versions: 2.2.0
            Reporter: Victor Kotai
            Assignee: Thomas Groh


We're currently migrating one of our pipelines from dataflow-sdk-1.9.1 to beam-2.2.0. To validate the behavior of the the flow, we've got an integration test (more than one) that reads a file containing TableRows in json representation, then runs the pipeline transformations on the resulting PCollection. 

Example of the rows: 
```
{"id":"firstId","type":"messageType","ts":"1500907784699", "message":"someMessage"}
{"id":"secondId","type":"messageType","ts":"1500907964217", "message":null}
```

In the test we've set up a pipeline to use the `DirectRunner`, the pipeline reads a PCollection from a file and does some operations.

The code to read from a file now looks like this:
```
public static PCollection<TableRow> readRowsFromFile(Pipeline pipeline, String filePath) {
    return pipeline.apply(TextIO.read().from(Utils.absolutePathOf(filePath)))
            .apply(ParDo.of(new ParseTableRowFromJson()));
}
```
Where `ParseTableRowFromJson` looks is:
```
public class ParseTableRowFromJson extends DoFn<String, TableRow> {
    private static Logger LOG = LoggerFactory.getLogger(ParseTableRowFromJson.class);

    @ProcessElement
    public void processElement(ProcessContext processContext) {
        String input = processContext.element();
        try {
            ByteArrayInputStream inStream = new ByteArrayInputStream(input.getBytes("UTF-8"));
            processContext.output(TableRowJsonCoder.of().decode(inStream, OUTER));
        } catch (IOException e) {
            LOG.warn("Failed parsing tableRow json: {}", input);
        }
    }
}
```
or with a different implementation, that pretty much does the same thing:
```
private static final ObjectMapper MAPPER = (new ObjectMapper());

@ProcessElement
public void processElement(ProcessContext processContext) {
    String input = processContext.element();
    try {
        processContext.output(MAPPER.readValue(input, TableRow.class));
    } catch (IOException e) {
        LOG.warn("Failed parsing tableRow json: {}", input);
    }
}
```

When we run this on the given input we get the following:
```
Caused by: java.lang.NullPointerException
	at com.google.api.client.util.ArrayMap$Entry.hashCode(ArrayMap.java:419)
	at java.util.AbstractMap.hashCode(AbstractMap.java:530)
	at java.util.Arrays.hashCode(Arrays.java:4146)
	at java.util.Objects.hash(Objects.java:128)
	at org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow.hashCode(WindowedValue.java:284)
	at java.util.HashMap.hash(HashMap.java:338)
	at java.util.HashMap.get(HashMap.java:556)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractMapBasedMultimap.put(AbstractMapBasedMultimap.java:191)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.AbstractSetMultimap.put(AbstractSetMultimap.java:130)
	at org.apache.beam.runners.direct.repackaged.com.google.common.collect.HashMultimap.put(HashMultimap.java:48)
	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:110)
	at org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:235)
	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211)
	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66)
	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436)
	at org.apache.beam.runners.direct.repackaged.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424)
	at ParseTableRowFromJson.processElement(ParseTableRowFromJson.java:22)
```

The NPE is due to a bad implementation of `hashCode` in `ArrayMap$Entry.hashCode` and it occurs on the TableRow that contains a `null` value for `message`.

I've PRed a change into the library that should fix this: [Issue here|https://github.com/google/google-http-java-client/issues/384]

I would guess now it's a matter of the library releasing and updating the BEAM dependency to it? Or is there anything else that should be done?




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)