You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@beam.apache.org by "Beam JIRA Bot (Jira)" <ji...@apache.org> on 2020/11/01 17:11:01 UTC

[jira] [Updated] (BEAM-9267) A BeamSQL UDF that returns a Map fails always with NullPointerException.

     [ https://issues.apache.org/jira/browse/BEAM-9267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Beam JIRA Bot updated BEAM-9267:
--------------------------------
    Labels: stale-P2  (was: )

> A BeamSQL UDF that returns a Map fails always with NullPointerException.
> ------------------------------------------------------------------------
>
>                 Key: BEAM-9267
>                 URL: https://issues.apache.org/jira/browse/BEAM-9267
>             Project: Beam
>          Issue Type: Bug
>          Components: dsl-sql
>    Affects Versions: 2.19.0
>            Reporter: Niels Basjes
>            Priority: P2
>              Labels: stale-P2
>
> When I create a UDF that returns a Map<String, String> and call it from within a SQL statement it consistently fails with a NullPointerException
> My UDF
> {code}public class FooMap implements SerializableFunction<String, Map<String, String>> {
>     @Override
>     public Map<String, String> apply(String input) {
>         final HashMap<String, String> hashMap = new HashMap<>();
>         hashMap.put("Some", "Thing");
>         return hashMap;
>     }
> }
> {code}
> and
> {code}
> public class BarString implements SerializableFunction<String, String> {
>     @Override
>     public String apply(String input) {
>         return new StringBuilder(input).reverse().toString();
>     }
> }
> {code}
> My test 
> {code}
> @Category(ValidatesRunner.class)
> public class TestFunctionReturnsMap implements Serializable {
>     private static final Logger LOG = LoggerFactory.getLogger(TestFunctionReturnsMap.class);
>     @Rule
>     public final transient TestPipeline pipeline = TestPipeline.create();
>     @Test
>     @Category(NeedsRunner.class)
>     public void testUserAgentAnalysisSQL() {
>         // ============================================================
>         // Create input PCollection<Row>
>         Schema inputSchema = Schema
>             .builder()
>             .addStringField("bar")
>             .build();
>         PCollection<Row> input = pipeline
>             .apply(Create.of(Arrays.asList("One", "Two", "Three")))
>             .setCoder(StringUtf8Coder.of())
>             .apply(ParDo.of(new DoFn<String, Row>() {
>                 @ProcessElement
>                 public void processElement(ProcessContext c) {
>                     c.output(Row
>                         .withSchema(inputSchema)
>                         .addValues(c.element())
>                         .build());
>                 }
>             })).setRowSchema(inputSchema);
>         // ============================================================
>         PCollection<Row> result =
>             // This way we give a name to the input stream for use in the SQL
>             PCollectionTuple.of("InputStream", input)
>                 // Apply the SQL with the UDFs we need.
>                 .apply("Execute SQL", SqlTransform
>                     .query(
>                         "SELECT" +
>                         "   bar             AS bar" +
>                         "  ,Bar(bar)        AS barbar " +
>                         "  ,Foo(bar)        AS foobar " +
>                         "FROM InputStream")
>                     .registerUdf("Foo",     new FooMap())
>                     .registerUdf("Bar",     new BarString())
>                 );
>         result.apply(ParDo.of(new RowPrinter()));
>         pipeline.run().waitUntilFinish();
>     }
>     public static class RowPrinter extends DoFn<Row, Row> {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             final Row row = c.element();
>             LOG.info("ROW: {} --> {}", row, row.getSchema());
>         }
>     }
> }
> {code}
> The Exception I always get
> {code}
> java.lang.NullPointerException: Null type
> 	at org.apache.beam.sdk.schemas.AutoValue_Schema_Field$Builder.setType(AutoValue_Schema_Field.java:84)
> 	at org.apache.beam.sdk.schemas.Schema$Field.of(Schema.java:893)
> 	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:234)
> 	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:230)
> 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
> 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
> 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
> 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> 	at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:189)
> 	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:129)
> 	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:110)
> 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> 	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:69)
> 	at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:39)
> 	at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
> 	at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
> 	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
> 	at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
> 	at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
> 	at nl.basjes.parse.useragent.beam.TestFunctionReturnsMap.testUserAgentAnalysisSQL(TestFunctionReturnsMap.java:81)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:498)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)