You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Balogh, György" <bo...@ultinous.com> on 2023/10/12 18:19:04 UTC

Cannot find a matching Beam FieldType for Calcite type: REAL

Hi,
I'm using beam 2.51.0
I'm trying to use UDF to transform float arrays and got the following error:

Exception in thread "main" java.lang.IllegalArgumentException: Cannot find
a matching Beam FieldType for Calcite type: REAL
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)


This is my code:

public static class TestUDF implements SerializableFunction<List<Float>, Float>
{
@Override
public Float apply(List<Float> fv) {
float sum = 0;
if(fv != null)
for (Float a : fv)
sum += a;
return sum;
}
}

public static void example6() {
System.out.println("Example 6");

Schema rowSchema = Schema.builder()
.addField("ind", Schema.FieldType.INT32)
.addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
.build();

Pipeline p = createPipeline();
p
.apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
.apply(ParDo.of(new DoFn<Integer, Row>() {
@ProcessElement
public void processElement(@Element Integer ind, OutputReceiver<Row> out) {
Row.Builder rowBuilder = Row.withSchema(rowSchema);
List<Float> fv = new ArrayList<Float>();
fv.add(1f * ind);
fv.add(2f * ind);
Row row = rowBuilder
.addValue(ind)
.addValue(fv)
.build();
out.output(row);
}
}))
.setRowSchema(rowSchema)
.apply(
SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION")
.registerUdf("testUDF", new TestUDF())
)
.apply(new Print());
p.run().waitUntilFinish();
}


-- 

György Balogh
CTO
E gyorgy.balogh@ultinous.com <zs...@ultinous.com>
M +36 30 270 8342 <+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com

Re: Cannot find a matching Beam FieldType for Calcite type: REAL

Posted by Alexey Romanenko <ar...@gmail.com>.
Seems like Calcite decided to use REAL for Float values in SQL transform, while Beam SQL (iinm) doesn’t have a conversion from Sql.REAL to any type of Beam schema field.

A workaround could be to add such conversion (REAL -> FLOAT) into CalciteUtils.java

—
Alexey

> On 12 Oct 2023, at 20:19, Balogh, György <bo...@ultinous.com> wrote:
> 
> Hi,
> I'm using beam 2.51.0
> I'm trying to use UDF to transform float arrays and got the following error:
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a matching Beam FieldType for Calcite type: REAL
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
> at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
> at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
> at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
> at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
> at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
> at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)
> 
> 
> This is my code:
> 
> public static class TestUDF implements SerializableFunction<List<Float>, Float> {
>     @Override
>     public Float apply(List<Float> fv) {
>         float sum = 0;
>         if(fv != null)
>             for (Float a : fv)
>                 sum += a;
>         return sum;
>     }
> }
> 
> public static void example6() {
>     System.out.println("Example 6");
> 
>     Schema rowSchema = Schema.builder()
>             .addField("ind", Schema.FieldType.INT32)
>             .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
>             .build();
> 
>     Pipeline p = createPipeline();
>     p
>             .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
>             .apply(ParDo.of(new DoFn<Integer, Row>() {
>                 @ProcessElement
>                 public void processElement(@Element Integer ind, OutputReceiver<Row> out) {
>                     Row.Builder rowBuilder = Row.withSchema(rowSchema);
>                     List<Float> fv = new ArrayList<Float>();
>                     fv.add(1f * ind);
>                     fv.add(2f * ind);
>                     Row row = rowBuilder
>                             .addValue(ind)
>                             .addValue(fv)
>                             .build();
>                     out.output(row);
>                 }
>             }))
>             .setRowSchema(rowSchema)
>             .apply(
>                     SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION")
>                             .registerUdf("testUDF", new TestUDF())
>             )
>             .apply(new Print());
>     p.run().waitUntilFinish();
> }
> 
> 
> -- 
> 
> György Balogh
> CTO
> E	gyorgy.balogh@ultinous.com <ma...@ultinous.com>
> M	+36 30 270 8342 <tel:+36%2030%20270%208342>
> A	HU, 1117 Budapest, Budafoki út 209.
> W	www.ultinous.com <http://www.ultinous.com/>