You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/04 14:50:58 UTC
[GitHub] [beam] damccorm opened a new issue, #20050: Support Enum in SqlTransform
damccorm opened a new issue, #20050:
URL: https://github.com/apache/beam/issues/20050
I ran into this problem when trying to put my Avro records through the SqlTransform.
I was able to reduce the reproduction path to the code below.
This code fails on my machine (using Beam 2.19.0) with the following NullPointerException
```
org.apache.beam.sdk.extensions.sql.impl.ParseException: Unable to parse query SELECT name, direction
FROM InputStream at org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:175)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:103)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:125)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:83)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:539)
at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:490)
at org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:261)
at
com.bol.analytics.m2.TestAvro2SQL.testAvro2SQL(TestAvro2SQL.java:99)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native
Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at
java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:319)
at
org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at
org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at
org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at
org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at
org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at
com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at
com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
Caused by: org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException:
java.lang.NullPointerException
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:217)
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:144)
...
31 more
Caused by: java.lang.NullPointerException
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.type.SqlTypeFactoryImpl.createSqlType(SqlTypeFactoryImpl.java:45)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:280)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toRelDataType(CalciteUtils.java:287)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.lambda$toCalciteRowType$0(CalciteUtils.java:261)
at
java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toCalciteRowType(CalciteUtils.java:258)
at
org.apache.beam.sdk.extensions.sql.impl.BeamCalciteTable.getRowType(BeamCalciteTable.java:71)
at org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:159)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:105)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:177)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3109)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3091)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3363)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:995)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:955)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:216)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:930)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:637)
at
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.prepare.PlannerImpl.validate(PlannerImpl.java:215)
...
32 more
```
```
@Test
@Category(NeedsRunner.class)
public void testAvro2SQL() {
// ============================================================
// The base test input
Schema testSchema = (new Schema.Parser()).parse(
"{\"type\":\"record\",\"name\":\"Transport\"," +
"\"fields\":[" +
"{\"name\":\"name\",\"type\":[\"null\",{\"type\":\"string\",\"avro.java.string\":\"String\"}],\"default\":null},"
+
"{\"name\":\"direction\",\"type\":{\"type\":\"enum\",\"name\":\"DirectionType\",\"symbols\":[\"PULL\",\"PUSH\"]}}"
+
"]}");
GenericRecord record = new GenericRecordBuilder(testSchema)
.set("name", "Test")
.set("direction", new GenericData.EnumSymbol(testSchema.getField("direction").schema(),
"PULL"))
.build();
// List of test Inputs
List<GenericRecord> testRecords
= Collections.singletonList(record);
// ============================================================
// Convert into a PCollection<Row>
PCollection<Row> input = pipeline
.apply(Create.of(testRecords).withCoder(AvroGenericCoder.of(testSchema)))
.apply(ParDo.of(new DoFn<GenericRecord, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(toBeamRowStrict(c.element(),
null));
}
}))
.setRowSchema(toBeamSchema(testSchema));
// ============================================================
PCollection<Row> result
=
// This way we give a name to the input stream for use in the SQL
PCollectionTuple
.of("InputStream", input)
// Apply the SQL.
.apply("Execute
SQL", SqlTransform
.query("SELECT" +
" name, direction
" +
"FROM InputStream"));
pipeline.run().waitUntilFinish();
}
```
Imported from Jira [BEAM-9361](https://issues.apache.org/jira/browse/BEAM-9361). Original Jira may contain additional context.
Reported by: nielsbasjes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org