You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by André Midea Jasiskis <mi...@gmail.com> on 2022/10/19 09:08:35 UTC

Table to DataStream conversion - Error while generating structured type converter.

Hi all,

I would like to interchangeably convert between Tables and DataStreams with
the ability to specify what are the types used in the conversion from a
POJO to a Table and vice-versa.

To convert from a POJO to a table I'm creating a Schema using a ROW-based
DataType, and to convert from a Table to a DataStream I'm using a
StructuredDataType.

The issue is that I get an exception when converting the table back to a
datastream:

>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> ...
> Caused by: org.codehaus.commons.compiler.CompileException: Line 11, Column
> 106: Cannot cast "flink_examples.datastream_table.Transfer" to
> "org.apache.flink.types.Row"
>

I tried many different ways to get this to work, but I can never get a
solution that works for all cases. The example below works just fine if the
source for my table is a Table Connector (e.g: kafka) but not when creating
the table from a DataStream (which we use for testing purposes).

 The full code and exception are as follows:

case class Transfer(var transferId: String, var amount: math.BigDecimal,
> var customerId: String) {
>   def this() = this(null,null,null)
> }
>
> case class Metadata(var createdAt: Instant) {
>   def this() = this(null)
> }
>
> case class TransferEvent(var data: Transfer, var meta: Metadata) {
>   def this() = this(null, null)
> }
> object ConvertBetweenDataStreamAndTable:
>   val t1 = Instant.now().minus(1000, ChronoUnit.SECONDS)
>   val t2 = Instant.now().minus(500, ChronoUnit.SECONDS)
>   val t3 = Instant.now().minus(200, ChronoUnit.SECONDS)
>
>   val transfer1 = TransferEvent(Transfer("transfer1",
> math.BigDecimal.valueOf(100), "customer1"), Metadata(t1))
>   val transfer2 = TransferEvent(Transfer("transfer2",
> math.BigDecimal.valueOf(50), "customer2"), Metadata(t2))
>   val transfer3 = TransferEvent(Transfer("transfer3",
> math.BigDecimal.valueOf(10), "customer1"), Metadata(t3))
>
>   val dataTypeStructured = DataTypes.STRUCTURED(
>     classOf[TransferEvent],
>     DataTypes.FIELD("data",
>       DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()),
>         DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)),
>           DataTypes.FIELD("customerId", DataTypes.STRING()))
>       ),
>     DataTypes.FIELD("meta", DataTypes.ROW(
>       DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3))
>     ))
>   )
>   val dataTypeRow = DataTypes.ROW(
>     DataTypes.FIELD("data",
>       DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()),
>         DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)),
>         DataTypes.FIELD("customerId", DataTypes.STRING()))
>     ),
>     DataTypes.FIELD("meta", DataTypes.ROW(
>       DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3))
>     ))
>   )
>
>   val schema = Schema.newBuilder().fromRowDataType(dataTypeRow).build()
>
>
>   def workflow() =
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = StreamTableEnvironment.create(env)
>
>     val flinkType = TypeInformation.of(classOf[TransferEvent])
>
>     println(dataTypeRow)
>
>     val ds: SingleOutputStreamOperator[TransferEvent] =
> env.fromElements(transfer1, transfer2, transfer3).returns(flinkType)
>     val table = tEnv.fromDataStream(ds, schema)
>     tEnv.createTemporaryView("transfers", table)
>
>     tEnv.from("transfers").printSchema()
>
>     val x = tEnv.sqlQuery("select data, meta from transfers")
>
>     val tableAsDataStream: DataStream[TransferEvent] =
> tEnv.toDataStream(x, dataTypeStructured)
>     tableAsDataStream.print()
>     env.execute()
>
> end ConvertBetweenDataStreamAndTable
>
> @main
> def main() =
>   ConvertBetweenDataStreamAndTable.workflow()



Exception:

ROW<`data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId`
> STRING>, `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)>>
> (
>   `data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId`
> STRING>,
>   `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)>
> )
> /* 1 */public class
> flink_examples$datastream_table$TransferEvent$3$Converter implements
> org.apache.flink.table.data.conversion.DataStructureConverter {
> /* 2 */    private final org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters;
> /* 3 */    private final
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters;
> /* 4 */    public
> flink_examples$datastream_table$TransferEvent$3$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters,
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters) {
> /* 5 */        this.fieldGetters = fieldGetters;
> /* 6 */        this.fieldConverters = fieldConverters;
> /* 7 */    }
> /* 8 */    public java.lang.Object toInternal(java.lang.Object o) {
> /* 9 */        final flink_examples.datastream_table.TransferEvent
> external = (flink_examples.datastream_table.TransferEvent) o;
> /* 10 */        final org.apache.flink.table.data.GenericRowData
> genericRow = new org.apache.flink.table.data.GenericRowData(2);
> /* 11 */        genericRow.setField(0,
> fieldConverters[0].toInternalOrNull(((org.apache.flink.types.Row)
> external.data())));
> /* 12 */        genericRow.setField(1,
> fieldConverters[1].toInternalOrNull(((org.apache.flink.types.Row)
> external.meta())));
> /* 13 */        return genericRow;
> /* 14 */    }
> /* 15 */    public java.lang.Object toExternal(java.lang.Object o) {
> /* 16 */        final org.apache.flink.table.data.RowData internal =
> (org.apache.flink.table.data.RowData) o;
> /* 17 */        final flink_examples.datastream_table.TransferEvent
> structured = new flink_examples.datastream_table.TransferEvent();
> /* 18 */
>  structured.data_$eq(((flink_examples.datastream_table.Transfer)
> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
> /* 19 */
>  structured.meta_$eq(((flink_examples.datastream_table.Metadata)
> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
> /* 20 */        return structured;
> /* 21 */    }
> /* 22 */}
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Error while generating structured type converter.
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
> at
> org.apache.flink.table.runtime.typeutils.ExternalSerializer.initializeConverter(ExternalSerializer.java:217)
> at
> org.apache.flink.table.runtime.typeutils.ExternalSerializer.<init>(ExternalSerializer.java:78)
> at
> org.apache.flink.table.runtime.typeutils.ExternalSerializer.of(ExternalSerializer.java:93)
> at
> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.createExternalTypeSerializer(ExternalTypeInfo.java:97)
> at
> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.of(ExternalTypeInfo.java:67)
> at
> org.apache.flink.table.planner.connectors.ExternalDynamicSink.lambda$getSinkRuntimeProvider$2(ExternalDynamicSink.java:115)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:452)
> at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:193)
> at
> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:167)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:82)
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
> at scala.collection.Iterator.foreach(Iterator.scala:937)
> at scala.collection.Iterator.foreach$(Iterator.scala:937)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:81)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
> at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218)
> at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245)
> at
> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$.workflow(ConvertBetweenDataStreamAndTable.scala:81)
> at
> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$package$.main(ConvertBetweenDataStreamAndTable.scala:89)
> at
> flink_examples.datastream_table.main.main(ConvertBetweenDataStreamAndTable.scala:87)
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
> ... 29 more
> Caused by:
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
> ... 30 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
> ... 33 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 11, Column
> 106: Cannot cast "flink_examples.datastream_table.Transfer" to
> "org.apache.flink.types.Row"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
> ... 39 more
>
> Process finished with exit code 1
>


I appreciate the help.

Best,
Andre

Re: Table to DataStream conversion - Error while generating structured type converter.

Posted by André Midea Jasiskis <mi...@gmail.com>.
Hi again,

To be extra sure it isn't something related to scala I've created the same
example in Java:

import java.math.BigDecimal;
> import java.time.Instant;
> import java.time.temporal.ChronoUnit;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import org.apache.flink.streaming.api.datastream.DataStreamSource;
> import
> org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.annotation.DataTypeHint;
> import org.apache.flink.table.api.*;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import org.apache.flink.table.types.DataType;
>
>
> public class ConvertBetweenDataStreamAndTable {
>
>     public static class Transfer {
>         public String transferId;
>         @DataTypeHint(defaultDecimalScale = 18, defaultDecimalPrecision =
> 38)
>         public BigDecimal amount;
>         public String customerId;
>
>         public Transfer(String transferId, BigDecimal amount, String
> customerId) {
>             this.transferId = transferId;
>             this.amount = amount;
>             this.customerId = customerId;
>         }
>         public Transfer() { }
>     }
>
>     public static class Metadata {
>         public Instant createdAt;
>
>         public Metadata(Instant createdAt) {
>             this.createdAt = createdAt;
>         }
>
>         public Metadata() {}
>     }
>
>     public static class TransferEvent {
>         public Transfer data;
>         public Metadata meta;
>
>         public TransferEvent(Transfer data, Metadata meta) {
>             this.data = data;
>             this.meta = meta;
>         }
>
>         public TransferEvent() {
>         }
>     }
>
>
>     public static void main(String[] args) throws Exception {
>         new ConvertBetweenDataStreamAndTable().workflow();
>     }
>
>     public void workflow() throws Exception {
>         Instant t1 = Instant.now().minus(1000, ChronoUnit.SECONDS);
>         Instant t2 = Instant.now().minus(500, ChronoUnit.SECONDS);
>         Instant t3 = Instant.now().minus(200, ChronoUnit.SECONDS);
>
>         TransferEvent transfer1 = new TransferEvent(new
> Transfer("transfer1", BigDecimal.valueOf(100), "customer1"), new
> Metadata(t1));
>         TransferEvent transfer2 = new TransferEvent(new
> Transfer("transfer2", BigDecimal.valueOf(50), "customer2"), new
> Metadata(t2));
>         TransferEvent transfer3 = new TransferEvent(new
> Transfer("transfer3", BigDecimal.valueOf(10), "customer1"), new
> Metadata(t3));
>
>         DataType dataType = DataTypes.ROW(
>                 DataTypes.FIELD("data",
>                         DataTypes.ROW(DataTypes.FIELD("transferId",
> DataTypes.STRING()),
>                                 DataTypes.FIELD("amount",
> DataTypes.DECIMAL(38, 10)),
>                                 DataTypes.FIELD("customerId",
> DataTypes.STRING()))
>                 ),
>                 DataTypes.FIELD("meta", DataTypes.ROW(
>                         DataTypes.FIELD("createdAt",
> DataTypes.TIMESTAMP_LTZ(3))
>                 ))
>         );
>
>         Schema schema =
> Schema.newBuilder().fromRowDataType(dataType).build();
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
>
>         TypeInformation<TransferEvent> of =
> TypeInformation.of(TransferEvent.class);
>         SingleOutputStreamOperator<TransferEvent> transfers =
> env.fromElements(transfer1, transfer2, transfer3).returns(of);
>         Table table = tableEnv.fromDataStream(transfers, schema);
>         tableEnv.createTemporaryView("transfers", table);
>
>         Table x = tableEnv.sqlQuery("select data, meta from transfers");
>
>         DataStream<TransferEvent> objectDataStream =
> tableEnv.toDataStream(x, DataTypes.of(TransferEvent.class));
>         objectDataStream.print();
>         env.execute();
>     }
> }
>

Same exact exception:

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
> SLF4J: Defaulting to no-operation (NOP) logger implementation
> SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further
> details.
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by
> org.apache.flink.api.java.ClosureCleaner
> (file:/Users/andre.midea/Library/Caches/Coursier/v1/https/
> repo1.maven.org/maven2/org/apache/flink/flink-core/1.15.2/flink-core-1.15.2.jar)
> to field java.lang.String.value
> WARNING: Please consider reporting this to the maintainers of
> org.apache.flink.api.java.ClosureCleaner
> WARNING: Use --illegal-access=warn to enable warnings of further illegal
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> /* 1 */public class
> flink_examples$java$datastream_table$ConvertBetweenDataStreamAndTable$TransferEvent$1$Converter
> implements org.apache.flink.table.data.conversion.DataStructureConverter {
> /* 2 */    private final org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters;
> /* 3 */    private final
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters;
> /* 4 */    public
> flink_examples$java$datastream_table$ConvertBetweenDataStreamAndTable$TransferEvent$1$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
> fieldGetters,
> org.apache.flink.table.data.conversion.DataStructureConverter[]
> fieldConverters) {
> /* 5 */        this.fieldGetters = fieldGetters;
> /* 6 */        this.fieldConverters = fieldConverters;
> /* 7 */    }
> /* 8 */    public java.lang.Object toInternal(java.lang.Object o) {
> /* 9 */        final
> flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent
> external =
> (flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent)
> o;
> /* 10 */        final org.apache.flink.table.data.GenericRowData
> genericRow = new org.apache.flink.table.data.GenericRowData(2);
> /* 11 */        genericRow.setField(0,
> fieldConverters[0].toInternalOrNull(((org.apache.flink.types.Row)
> external.data)));
> /* 12 */        genericRow.setField(1,
> fieldConverters[1].toInternalOrNull(((org.apache.flink.types.Row)
> external.meta)));
> /* 13 */        return genericRow;
> /* 14 */    }
> /* 15 */    public java.lang.Object toExternal(java.lang.Object o) {
> /* 16 */        final org.apache.flink.table.data.RowData internal =
> (org.apache.flink.table.data.RowData) o;
> /* 17 */        final
> flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent
> structured = new
> flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.TransferEvent();
> /* 18 */        structured.data =
> ((flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.Transfer)
> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal)));
> /* 19 */        structured.meta =
> ((flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable.Metadata)
> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal)));
> /* 20 */        return structured;
> /* 21 */    }
> /* 22 */}
>
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
> at
> org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
> at
> java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:259)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1389)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
> at
> java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
> at akka.dispatch.OnComplete.internal(Future.scala:300)
> at akka.dispatch.OnComplete.internal(Future.scala:297)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
> at
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
> at
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
> at
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
> at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
> at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
> at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
> at
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
> at
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> at
> java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
> at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
> at
> java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed
> by NoRestartBackoffTimeStrategy
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
> at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:301)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:291)
> at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:282)
> at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:739)
> at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
> at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:443)
> at jdk.internal.reflect.GeneratedMethodAccessor28.invoke(Unknown Source)
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:304)
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:302)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
> ... 5 more
> Caused by: org.apache.flink.table.api.TableException: Error while
> generating structured type converter.
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
> at
> org.apache.flink.table.runtime.connector.source.DataStructureConverterWrapper.open(DataStructureConverterWrapper.java:46)
> at
> org.apache.flink.table.runtime.operators.source.InputConversionOperator.open(InputConversionOperator.java:76)
> at
> org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.base/java.lang.Thread.run(Thread.java:829)
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
> at
> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
> ... 12 more
> Caused by:
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
> ... 13 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
> at
> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
> ... 16 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 11, Column
> 106: Cannot cast
> "flink_examples.java.datastream_table.ConvertBetweenDataStreamAndTable$Transfer"
> to "org.apache.flink.types.Row"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
> at
> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
> at
> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
> at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
> at
> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
> at
> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
> at
> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
> at
> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
> ... 22 more
>
> Process finished with exit code 1
>

On Wed, Oct 19, 2022 at 11:08 AM André Midea Jasiskis <mi...@gmail.com>
wrote:

> Hi all,
>
> I would like to interchangeably convert between Tables and DataStreams
> with the ability to specify what are the types used in the conversion from
> a POJO to a Table and vice-versa.
>
> To convert from a POJO to a table I'm creating a Schema using a ROW-based
> DataType, and to convert from a Table to a DataStream I'm using a
> StructuredDataType.
>
> The issue is that I get an exception when converting the table back to a
> datastream:
>
>>
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> ...
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 11,
>> Column 106: Cannot cast "flink_examples.datastream_table.Transfer" to
>> "org.apache.flink.types.Row"
>>
>
> I tried many different ways to get this to work, but I can never get a
> solution that works for all cases. The example below works just fine if the
> source for my table is a Table Connector (e.g: kafka) but not when creating
> the table from a DataStream (which we use for testing purposes).
>
>  The full code and exception are as follows:
>
> case class Transfer(var transferId: String, var amount: math.BigDecimal,
>> var customerId: String) {
>>   def this() = this(null,null,null)
>> }
>>
>> case class Metadata(var createdAt: Instant) {
>>   def this() = this(null)
>> }
>>
>> case class TransferEvent(var data: Transfer, var meta: Metadata) {
>>   def this() = this(null, null)
>> }
>> object ConvertBetweenDataStreamAndTable:
>>   val t1 = Instant.now().minus(1000, ChronoUnit.SECONDS)
>>   val t2 = Instant.now().minus(500, ChronoUnit.SECONDS)
>>   val t3 = Instant.now().minus(200, ChronoUnit.SECONDS)
>>
>>   val transfer1 = TransferEvent(Transfer("transfer1",
>> math.BigDecimal.valueOf(100), "customer1"), Metadata(t1))
>>   val transfer2 = TransferEvent(Transfer("transfer2",
>> math.BigDecimal.valueOf(50), "customer2"), Metadata(t2))
>>   val transfer3 = TransferEvent(Transfer("transfer3",
>> math.BigDecimal.valueOf(10), "customer1"), Metadata(t3))
>>
>>   val dataTypeStructured = DataTypes.STRUCTURED(
>>     classOf[TransferEvent],
>>     DataTypes.FIELD("data",
>>       DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()),
>>         DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)),
>>           DataTypes.FIELD("customerId", DataTypes.STRING()))
>>       ),
>>     DataTypes.FIELD("meta", DataTypes.ROW(
>>       DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3))
>>     ))
>>   )
>>   val dataTypeRow = DataTypes.ROW(
>>     DataTypes.FIELD("data",
>>       DataTypes.ROW(DataTypes.FIELD("transferId", DataTypes.STRING()),
>>         DataTypes.FIELD("amount", DataTypes.DECIMAL(38, 10)),
>>         DataTypes.FIELD("customerId", DataTypes.STRING()))
>>     ),
>>     DataTypes.FIELD("meta", DataTypes.ROW(
>>       DataTypes.FIELD("createdAt", DataTypes.TIMESTAMP_LTZ(3))
>>     ))
>>   )
>>
>>   val schema = Schema.newBuilder().fromRowDataType(dataTypeRow).build()
>>
>>
>>   def workflow() =
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>     val tEnv = StreamTableEnvironment.create(env)
>>
>>     val flinkType = TypeInformation.of(classOf[TransferEvent])
>>
>>     println(dataTypeRow)
>>
>>     val ds: SingleOutputStreamOperator[TransferEvent] =
>> env.fromElements(transfer1, transfer2, transfer3).returns(flinkType)
>>     val table = tEnv.fromDataStream(ds, schema)
>>     tEnv.createTemporaryView("transfers", table)
>>
>>     tEnv.from("transfers").printSchema()
>>
>>     val x = tEnv.sqlQuery("select data, meta from transfers")
>>
>>     val tableAsDataStream: DataStream[TransferEvent] =
>> tEnv.toDataStream(x, dataTypeStructured)
>>     tableAsDataStream.print()
>>     env.execute()
>>
>> end ConvertBetweenDataStreamAndTable
>>
>> @main
>> def main() =
>>   ConvertBetweenDataStreamAndTable.workflow()
>
>
>
> Exception:
>
> ROW<`data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId`
>> STRING>, `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)>>
>> (
>>   `data` ROW<`transferId` STRING, `amount` DECIMAL(38, 10), `customerId`
>> STRING>,
>>   `meta` ROW<`createdAt` TIMESTAMP_LTZ(3)>
>> )
>> /* 1 */public class
>> flink_examples$datastream_table$TransferEvent$3$Converter implements
>> org.apache.flink.table.data.conversion.DataStructureConverter {
>> /* 2 */    private final
>> org.apache.flink.table.data.RowData.FieldGetter[] fieldGetters;
>> /* 3 */    private final
>> org.apache.flink.table.data.conversion.DataStructureConverter[]
>> fieldConverters;
>> /* 4 */    public
>> flink_examples$datastream_table$TransferEvent$3$Converter(org.apache.flink.table.data.RowData.FieldGetter[]
>> fieldGetters,
>> org.apache.flink.table.data.conversion.DataStructureConverter[]
>> fieldConverters) {
>> /* 5 */        this.fieldGetters = fieldGetters;
>> /* 6 */        this.fieldConverters = fieldConverters;
>> /* 7 */    }
>> /* 8 */    public java.lang.Object toInternal(java.lang.Object o) {
>> /* 9 */        final flink_examples.datastream_table.TransferEvent
>> external = (flink_examples.datastream_table.TransferEvent) o;
>> /* 10 */        final org.apache.flink.table.data.GenericRowData
>> genericRow = new org.apache.flink.table.data.GenericRowData(2);
>> /* 11 */        genericRow.setField(0,
>> fieldConverters[0].toInternalOrNull(((org.apache.flink.types.Row)
>> external.data())));
>> /* 12 */        genericRow.setField(1,
>> fieldConverters[1].toInternalOrNull(((org.apache.flink.types.Row)
>> external.meta())));
>> /* 13 */        return genericRow;
>> /* 14 */    }
>> /* 15 */    public java.lang.Object toExternal(java.lang.Object o) {
>> /* 16 */        final org.apache.flink.table.data.RowData internal =
>> (org.apache.flink.table.data.RowData) o;
>> /* 17 */        final flink_examples.datastream_table.TransferEvent
>> structured = new flink_examples.datastream_table.TransferEvent();
>> /* 18 */
>>  structured.data_$eq(((flink_examples.datastream_table.Transfer)
>> fieldConverters[0].toExternalOrNull(fieldGetters[0].getFieldOrNull(internal))));
>> /* 19 */
>>  structured.meta_$eq(((flink_examples.datastream_table.Metadata)
>> fieldConverters[1].toExternalOrNull(fieldGetters[1].getFieldOrNull(internal))));
>> /* 20 */        return structured;
>> /* 21 */    }
>> /* 22 */}
>>
>> Exception in thread "main" org.apache.flink.table.api.TableException:
>> Error while generating structured type converter.
>> at
>> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:89)
>> at
>> org.apache.flink.table.runtime.typeutils.ExternalSerializer.initializeConverter(ExternalSerializer.java:217)
>> at
>> org.apache.flink.table.runtime.typeutils.ExternalSerializer.<init>(ExternalSerializer.java:78)
>> at
>> org.apache.flink.table.runtime.typeutils.ExternalSerializer.of(ExternalSerializer.java:93)
>> at
>> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.createExternalTypeSerializer(ExternalTypeInfo.java:97)
>> at
>> org.apache.flink.table.runtime.typeutils.ExternalTypeInfo.of(ExternalTypeInfo.java:67)
>> at
>> org.apache.flink.table.planner.connectors.ExternalDynamicSink.lambda$getSinkRuntimeProvider$2(ExternalDynamicSink.java:115)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.applySinkProvider(CommonExecSink.java:452)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink.createSinkTransformation(CommonExecSink.java:193)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:167)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:82)
>> at
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>> at scala.collection.Iterator.foreach(Iterator.scala:937)
>> at scala.collection.Iterator.foreach$(Iterator.scala:937)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>> at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>> at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>> at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:81)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
>> at
>> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
>> at
>> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218)
>> at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245)
>> at
>> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$.workflow(ConvertBetweenDataStreamAndTable.scala:81)
>> at
>> flink_examples.datastream_table.ConvertBetweenDataStreamAndTable$package$.main(ConvertBetweenDataStreamAndTable.scala:89)
>> at
>> flink_examples.datastream_table.main.main(ConvertBetweenDataStreamAndTable.scala:87)
>> Caused by: org.apache.flink.util.FlinkRuntimeException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:94)
>> at
>> org.apache.flink.table.data.conversion.StructuredObjectConverter.open(StructuredObjectConverter.java:80)
>> ... 29 more
>> Caused by:
>> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.UncheckedExecutionException:
>> org.apache.flink.api.common.InvalidProgramException: Table program cannot
>> be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2051)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.get(LocalCache.java:3962)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4859)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:92)
>> ... 30 more
>> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
>> program cannot be compiled. This is a bug. Please file an issue.
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:107)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$0(CompileUtils.java:92)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2278)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>> at
>> org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2045)
>> ... 33 more
>> Caused by: org.codehaus.commons.compiler.CompileException: Line 11,
>> Column 106: Cannot cast "flink_examples.datastream_table.Transfer" to
>> "org.apache.flink.types.Row"
>> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12211)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5051)
>> at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)
>> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4418)
>> at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4396)
>> at org.codehaus.janino.Java$Cast.accept(Java.java:4898)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5057)
>> at org.codehaus.janino.UnitCompiler.access$8100(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4409)
>> at
>> org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4400)
>> at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:4924)
>> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4400)
>> at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4396)
>> at org.codehaus.janino.Java$Lvalue.accept(Java.java:4148)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>> at
>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>> at
>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
>> at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5182)
>> at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4423)
>> at
>> org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4396)
>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>> at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>> at
>> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5662)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
>> at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3762)
>> at
>> org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3734)
>> at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>> at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)
>> at
>> org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)
>> at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>> at
>> org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>> at
>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)
>> at
>> org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>> at
>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>> at
>> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>> at
>> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
>> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>> at
>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>> at
>> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:104)
>> ... 39 more
>>
>> Process finished with exit code 1
>>
>
>
> I appreciate the help.
>
> Best,
> Andre
>