You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 14:50:58 UTC

[GitHub] [beam] damccorm opened a new issue, #20050: Support Enum in SqlTransform

damccorm opened a new issue, #20050:
URL: https://github.com/apache/beam/issues/20050

   I ran into this problem when trying to put my Avro records through the SqlTransform.
   
   I was able to reduce the reproduction path to the code below.
   
   This code fails on my machine (using Beam 2.19.0) with the following NullPointerException
   ```
   
    org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT name, direction
   FROM InputStream	at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:175)
   	at
   org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:103)
   	at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
   	at
   org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
   	at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
   	at
   org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
   	at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
   	at
   com.bol.analytics.m2.TestAvro2SQL.testAvro2SQL(TestAvro2SQL.java:99)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
   Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at
   java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
   	at
   org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
   	at
   org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
   	at
   org.junit.rules.RunRules.evaluate(RunRules.java:20)
   	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
   	at
   org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
   	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
   	at
   org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
   	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
   	at
   org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
   	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
   	at
   org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
   	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
   	at
   org.junit.runner.JUnitCore.run(JUnitCore.java:137)
   	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
   	at
   com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
   	at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
   	at
   com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
   Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException:
   java.lang.NullPointerException
   	at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
   	at
   org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:144)
   	...
   31 more
   Caused by: java.lang.NullPointerException
   	at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl.createSqlType(SqlTypeFactoryImpl.java:45)
   	at
   org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:280)
   	at
   org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:287)
   	at
   org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.lambda$toCalciteRowType$0(CalciteUtils.java:261)
   	at
   java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
   	at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
   	at
   org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toCalciteRowType(CalciteUtils.java:258)
   	at
   org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable.getRowType(BeamCalciteTable.java:71)
   	at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
   	at
   org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
   	...
   32 more
   
   ```
   
    
   ```
   
   
       @Test
       @Category(NeedsRunner.class)
       public void testAvro2SQL() {
           // ============================================================
   
          // The base test input
   
           Schema testSchema = (new Schema.Parser()).parse(
           
          "{\"type\":\"record\",\"name\":\"Transport\"," +
                   "\"fields\":[" +
           
          "{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},"
   +
                   "{\"name\":\"direction\",\"type\":{\"type\":\"enum\",\"name\":\"DirectionType\",\"symbols\":[\"PULL\",\"PUSH\"]}}"
   +
                   "]}");
   
           GenericRecord record =  new GenericRecordBuilder(testSchema)
   
                  .set("name", "Test")
                   .set("direction", new GenericData.EnumSymbol(testSchema.getField("direction").schema(),
   "PULL"))
                   .build();
   
           // List of test Inputs
           List<GenericRecord> testRecords
   = Collections.singletonList(record);
   
           // ============================================================
   
          // Convert into a PCollection<Row>
           PCollection<Row> input = pipeline
               .apply(Create.of(testRecords).withCoder(AvroGenericCoder.of(testSchema)))
   
              .apply(ParDo.of(new DoFn<GenericRecord, Row>() {
                   @ProcessElement
        
             public void processElement(ProcessContext c) {
                       c.output(toBeamRowStrict(c.element(),
   null));
                   }
                   }))
               .setRowSchema(toBeamSchema(testSchema));
   
   
          // ============================================================
   
           PCollection<Row> result
   =
               // This way we give a name to the input stream for use in the SQL
               PCollectionTuple
   
                  .of("InputStream", input)
                   // Apply the SQL.
                   .apply("Execute
   SQL", SqlTransform
                       .query("SELECT" +
                               " name, direction
   " +
                               "FROM InputStream"));
   
           pipeline.run().waitUntilFinish();
   
      }
   
   ```
   
   
   Imported from Jira [BEAM-9361](https://issues.apache.org/jira/browse/BEAM-9361). Original Jira may contain additional context.
   Reported by: nielsbasjes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org