You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by "Balogh, György" <bo...@ultinous.com> on 2023/10/12 18:19:04 UTC
Cannot find a matching Beam FieldType for Calcite type: REAL
Hi,
I'm using beam 2.51.0
I'm trying to use UDF to transform float arrays and got the following error:
Exception in thread "main" java.lang.IllegalArgumentException: Cannot find
a matching Beam FieldType for Calcite type: REAL
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
at
org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)
This is my code:
public static class TestUDF implements SerializableFunction<List<Float>, Float>
{
@Override
public Float apply(List<Float> fv) {
float sum = 0;
if(fv != null)
for (Float a : fv)
sum += a;
return sum;
}
}
public static void example6() {
System.out.println("Example 6");
Schema rowSchema = Schema.builder()
.addField("ind", Schema.FieldType.INT32)
.addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
.build();
Pipeline p = createPipeline();
p
.apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
.apply(ParDo.of(new DoFn<Integer, Row>() {
@ProcessElement
public void processElement(@Element Integer ind, OutputReceiver<Row> out) {
Row.Builder rowBuilder = Row.withSchema(rowSchema);
List<Float> fv = new ArrayList<Float>();
fv.add(1f * ind);
fv.add(2f * ind);
Row row = rowBuilder
.addValue(ind)
.addValue(fv)
.build();
out.output(row);
}
}))
.setRowSchema(rowSchema)
.apply(
SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION")
.registerUdf("testUDF", new TestUDF())
)
.apply(new Print());
p.run().waitUntilFinish();
}
--
György Balogh
CTO
E gyorgy.balogh@ultinous.com <zs...@ultinous.com>
M +36 30 270 8342 <+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com
Re: Cannot find a matching Beam FieldType for Calcite type: REAL
Posted by Alexey Romanenko <ar...@gmail.com>.
Seems like Calcite decided to use REAL for Float values in SQL transform, while Beam SQL (iinm) doesn’t have a conversion from Sql.REAL to any type of Beam schema field.
A workaround could be to add such conversion (REAL -> FLOAT) into CalciteUtils.java
—
Alexey
> On 12 Oct 2023, at 20:19, Balogh, György <bo...@ultinous.com> wrote:
>
> Hi,
> I'm using beam 2.51.0
> I'm trying to use UDF to transform float arrays and got the following error:
>
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a matching Beam FieldType for Calcite type: REAL
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
> at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
> at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
> at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
> at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
> at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
> at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
> at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)
>
>
> This is my code:
>
> public static class TestUDF implements SerializableFunction<List<Float>, Float> {
> @Override
> public Float apply(List<Float> fv) {
> float sum = 0;
> if(fv != null)
> for (Float a : fv)
> sum += a;
> return sum;
> }
> }
>
> public static void example6() {
> System.out.println("Example 6");
>
> Schema rowSchema = Schema.builder()
> .addField("ind", Schema.FieldType.INT32)
> .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
> .build();
>
> Pipeline p = createPipeline();
> p
> .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
> .apply(ParDo.of(new DoFn<Integer, Row>() {
> @ProcessElement
> public void processElement(@Element Integer ind, OutputReceiver<Row> out) {
> Row.Builder rowBuilder = Row.withSchema(rowSchema);
> List<Float> fv = new ArrayList<Float>();
> fv.add(1f * ind);
> fv.add(2f * ind);
> Row row = rowBuilder
> .addValue(ind)
> .addValue(fv)
> .build();
> out.output(row);
> }
> }))
> .setRowSchema(rowSchema)
> .apply(
> SqlTransform.query("select fv, testUDF(fv) from PCOLLECTION")
> .registerUdf("testUDF", new TestUDF())
> )
> .apply(new Print());
> p.run().waitUntilFinish();
> }
>
>
> --
>
> György Balogh
> CTO
> E gyorgy.balogh@ultinous.com <ma...@ultinous.com>
> M +36 30 270 8342 <tel:+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com <http://www.ultinous.com/>