You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2019/07/17 08:29:00 UTC

[Table API] ClassCastException when converting a table to DataStream

Hello,

Consider the following snippet:

>     Table sourceTable = getKafkaSource0(tEnv);
>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>
> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>     stream.print();
>
where sourceTable.printSchema() shows:

> root
>  |-- time1: TimeIndicatorTypeInfo(rowtime)



 This program returns the following exception:

> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at app.metatron.test.Main2.main(Main2.java:231)
> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
> cast to java.lang.Long*
> * at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> ...


The row serializer seems to try to deep-copy an instance of
java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?

Best,

- Dongwon

p.s. though removing .returns() makes everything okay, I need to do that as
I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:

private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>     ConnectorDescriptor connectorDescriptor = new Kafka()
>       .version("universal")
>       .topic("mytopic")
>       .property("bootstrap.servers", "localhost:9092")
>       .property("group.id", "mygroup")
>       .startFromEarliest();
>     FormatDescriptor formatDescriptor = new Csv()
>       .deriveSchema()
>       .ignoreParseErrors()
>       .fieldDelimiter(',');
>     Schema schemaDescriptor = new Schema()
>       .field("time1", SQL_TIMESTAMP())
>       .rowtime(
>         new Rowtime()
>           .timestampsFromField("rowTime")
>           .watermarksPeriodicBounded(100)
>       );
>     tEnv.connect(connectorDescriptor)
>       .withFormat(formatDescriptor)
>       .withSchema(schemaDescriptor)
>       .inAppendMode()
>       .registerTableSource("mysrc");
>     return tEnv.scan("mysrc");
>   }

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,

Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.

Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.

--
Rong

[1] https://issues.apache.org/jira/browse/FLINK-13389

On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirclek@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>>
>>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>>> [2]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>>> [3]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>>> [4]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>>
>>>>
>>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Rong,
>>>>>
>>>>> Thank you for reply :-)
>>>>>
>>>>> which Flink version are you using?
>>>>>
>>>>> I'm using Flink-1.8.0.
>>>>>
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>
>>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>>
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>
>>>>> *".map(a->a)"* is just to illustrate a problem.
>>>>> My actual code contains a process function (instead of .map() in the
>>>>> snippet) which appends a new field containing watermark to a row.
>>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>>> convert table to datastream and vice versa.
>>>>>
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> yup :-)
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>>> adds a type information hint about the return type of this operator. It is
>>>>>> used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> The reason why I specify
>>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>>> information hint as you said.
>>>>> That is needed later when I need to make another table like
>>>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>>> Without the type information hint, I've got an error
>>>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>>> Please specify the type of the input with a RowTypeInfo."*
>>>>> That's why I give a type information hint in that way.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> Can you provide a bit more information:
>>>>>> which Flink version are you using?
>>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>>
>>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>>> only adds a type information hint about the return type of this operator.
>>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>>
>>>>>> Thanks,
>>>>>> Rong
>>>>>>
>>>>>> --
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Consider the following snippet:
>>>>>>>
>>>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>>> Row.class)
>>>>>>>>
>>>>>>>> *      .map(a -> a)
>>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>>>     stream.print();
>>>>>>>>
>>>>>>> where sourceTable.printSchema() shows:
>>>>>>>
>>>>>>>> root
>>>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>  This program returns the following exception:
>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>>>> be cast to java.lang.Long*
>>>>>>>> * at
>>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>>>> Could anybody help me?
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> - Dongwon
>>>>>>>
>>>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>>>> p.s. the source table is created as follows:
>>>>>>>
>>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>>> tEnv) {
>>>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>>>       .version("universal")
>>>>>>>>       .topic("mytopic")
>>>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>>>       .property("group.id", "mygroup")
>>>>>>>>       .startFromEarliest();
>>>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>>>       .deriveSchema()
>>>>>>>>       .ignoreParseErrors()
>>>>>>>>       .fieldDelimiter(',');
>>>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>>>       .rowtime(
>>>>>>>>         new Rowtime()
>>>>>>>>           .timestampsFromField("rowTime")
>>>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>>>       );
>>>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>>>       .withFormat(formatDescriptor)
>>>>>>>>       .withSchema(schemaDescriptor)
>>>>>>>>       .inAppendMode()
>>>>>>>>       .registerTableSource("mysrc");
>>>>>>>>     return tEnv.scan("mysrc");
>>>>>>>>   }
>>>>>>>
>>>>>>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Fabian,

Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using
Row type during the conversion (even though fields are added).

Best,

Dongwon



On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fh...@gmail.com> wrote:

> Hi Dongwon,
>
> regarding the question about the conversion: If you keep using the Row
> type and not adding/removing fields, the conversion is pretty much for free
> right now.
> It will be a MapFunction (sometimes even not function at all) that should
> be chained with the other operators. Hence, it should boil down to a
> function call.
>
> Best, Fabian
>
> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
> eastcirclek@gmail.com>:
>
>> Hi Rong,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>>> be a bug to me and will update once I find anything.
>>
>> Thanks a lot for spending your time on this.
>>
>> However from what you explained, if I understand correctly you can do all
>>> of your processing within the TableAPI scope without converting it back and
>>> forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>
>> I also agree with you in that the first class citizen Table API will make
>> everything not only easier but also a lot cleaner.
>> We however contain some corner cases that force us to covert Table from
>> and to DataStream.
>> One such case is to append to Table a column showing the current
>> watermark of each record; there's no other way but to do that as
>> ScalarFunction doesn't allow us to get the runtime context information as
>> ProcessFunction does.
>>
>> I have a question regarding the conversion.
>> Do I have to worry about runtime performance penalty in case that I
>> cannot help but convert back and fourth to DataStream?
>>
>> Best,
>>
>> Dongwon
>>
>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>> to be a bug to me and will update once I find anything.
>>>
>>> However from what you explained, if I understand correctly you can do
>>> all of your processing within the TableAPI scope without converting it back
>>> and forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>> [4]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>
>>>
>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi Rong,
>>>>
>>>> Thank you for reply :-)
>>>>
>>>> which Flink version are you using?
>>>>
>>>> I'm using Flink-1.8.0.
>>>>
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>
>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>
>>>> *".map(a->a)"* is just to illustrate a problem.
>>>> My actual code contains a process function (instead of .map() in the
>>>> snippet) which appends a new field containing watermark to a row.
>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>> convert table to datastream and vice versa.
>>>>
>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> yup :-)
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>> adds a type information hint about the return type of this operator. It is
>>>>> used in cases where Flink cannot determine automatically[1].
>>>>
>>>> The reason why I specify
>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>> information hint as you said.
>>>> That is needed later when I need to make another table like
>>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>> Without the type information hint, I've got an error
>>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>> Please specify the type of the input with a RowTypeInfo."*
>>>> That's why I give a type information hint in that way.
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> Can you provide a bit more information:
>>>>> which Flink version are you using?
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>> only adds a type information hint about the return type of this operator.
>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> --
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>
>>>>>
>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Consider the following snippet:
>>>>>>
>>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>> Row.class)
>>>>>>>
>>>>>>> *      .map(a -> a)
>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>>     stream.print();
>>>>>>>
>>>>>> where sourceTable.printSchema() shows:
>>>>>>
>>>>>>> root
>>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>
>>>>>>
>>>>>>
>>>>>>  This program returns the following exception:
>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>>> be cast to java.lang.Long*
>>>>>>> * at
>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>> ...
>>>>>>
>>>>>>
>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>>> Could anybody help me?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> - Dongwon
>>>>>>
>>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>>> p.s. the source table is created as follows:
>>>>>>
>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>> tEnv) {
>>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>>       .version("universal")
>>>>>>>       .topic("mytopic")
>>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>>       .property("group.id", "mygroup")
>>>>>>>       .startFromEarliest();
>>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>>       .deriveSchema()
>>>>>>>       .ignoreParseErrors()
>>>>>>>       .fieldDelimiter(',');
>>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>>       .rowtime(
>>>>>>>         new Rowtime()
>>>>>>>           .timestampsFromField("rowTime")
>>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>>       );
>>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>>       .withFormat(formatDescriptor)
>>>>>>>       .withSchema(schemaDescriptor)
>>>>>>>       .inAppendMode()
>>>>>>>       .registerTableSource("mysrc");
>>>>>>>     return tEnv.scan("mysrc");
>>>>>>>   }
>>>>>>
>>>>>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dongwon,

regarding the question about the conversion: If you keep using the Row type
and not adding/removing fields, the conversion is pretty much for free
right now.
It will be a MapFunction (sometimes even not function at all) that should
be chained with the other operators. Hence, it should boil down to a
function call.

Best, Fabian

Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
eastcirclek@gmail.com>:

> Hi Rong,
>
> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>
> Thanks a lot for spending your time on this.
>
> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>
> I also agree with you in that the first class citizen Table API will make
> everything not only easier but also a lot cleaner.
> We however contain some corner cases that force us to covert Table from
> and to DataStream.
> One such case is to append to Table a column showing the current watermark
> of each record; there's no other way but to do that as ScalarFunction
> doesn't allow us to get the runtime context information as ProcessFunction
> does.
>
> I have a question regarding the conversion.
> Do I have to worry about runtime performance penalty in case that I cannot
> help but convert back and fourth to DataStream?
>
> Best,
>
> Dongwon
>
> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>>
>> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>> [4]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>
>>
>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi Rong,
>>>
>>> Thank you for reply :-)
>>>
>>> which Flink version are you using?
>>>
>>> I'm using Flink-1.8.0.
>>>
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>
>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>
>>> *".map(a->a)"* is just to illustrate a problem.
>>> My actual code contains a process function (instead of .map() in the
>>> snippet) which appends a new field containing watermark to a row.
>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>> convert table to datastream and vice versa.
>>>
>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> yup :-)
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>> adds a type information hint about the return type of this operator. It is
>>>> used in cases where Flink cannot determine automatically[1].
>>>
>>> The reason why I specify
>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>> information hint as you said.
>>> That is needed later when I need to make another table like
>>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>> Without the type information hint, I've got an error
>>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>> Please specify the type of the input with a RowTypeInfo."*
>>> That's why I give a type information hint in that way.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> Can you provide a bit more information:
>>>> which Flink version are you using?
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>> only adds a type information hint about the return type of this operator.
>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> --
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>
>>>>
>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Consider the following snippet:
>>>>>
>>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>> Row.class)
>>>>>>
>>>>>> *      .map(a -> a)
>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>     stream.print();
>>>>>>
>>>>> where sourceTable.printSchema() shows:
>>>>>
>>>>>> root
>>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>
>>>>>
>>>>>
>>>>>  This program returns the following exception:
>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>> at
>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>> be cast to java.lang.Long*
>>>>>> * at
>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>> ...
>>>>>
>>>>>
>>>>> The row serializer seems to try to deep-copy an instance of
>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>> Could anybody help me?
>>>>>
>>>>> Best,
>>>>>
>>>>> - Dongwon
>>>>>
>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>> p.s. the source table is created as follows:
>>>>>
>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>> tEnv) {
>>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>       .version("universal")
>>>>>>       .topic("mytopic")
>>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>>       .property("group.id", "mygroup")
>>>>>>       .startFromEarliest();
>>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>>       .deriveSchema()
>>>>>>       .ignoreParseErrors()
>>>>>>       .fieldDelimiter(',');
>>>>>>     Schema schemaDescriptor = new Schema()
>>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>>       .rowtime(
>>>>>>         new Rowtime()
>>>>>>           .timestampsFromField("rowTime")
>>>>>>           .watermarksPeriodicBounded(100)
>>>>>>       );
>>>>>>     tEnv.connect(connectorDescriptor)
>>>>>>       .withFormat(formatDescriptor)
>>>>>>       .withSchema(schemaDescriptor)
>>>>>>       .inAppendMode()
>>>>>>       .registerTableSource("mysrc");
>>>>>>     return tEnv.scan("mysrc");
>>>>>>   }
>>>>>
>>>>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Rong,

I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.

Thanks a lot for spending your time on this.

However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.

I also agree with you in that the first class citizen Table API will make
everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and
to DataStream.
One such case is to append to Table a column showing the current watermark
of each record; there's no other way but to do that as ScalarFunction
doesn't allow us to get the runtime context information as ProcessFunction
does.

I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot
help but convert back and fourth to DataStream?

Best,

Dongwon

On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Dongwon,
>
> I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.
>
> However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.
>
> --
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>
>
> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> Thank you for reply :-)
>>
>> which Flink version are you using?
>>
>> I'm using Flink-1.8.0.
>>
>> what is the "sourceTable.getSchema().toRowType()" return?
>>
>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>
>> what is the line *".map(a -> a)" *do and can you remove it?
>>
>> *".map(a->a)"* is just to illustrate a problem.
>> My actual code contains a process function (instead of .map() in the
>> snippet) which appends a new field containing watermark to a row.
>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>> convert table to datastream and vice versa.
>>
>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>
>> yup :-)
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>> adds a type information hint about the return type of this operator. It is
>>> used in cases where Flink cannot determine automatically[1].
>>
>> The reason why I specify
>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>> information hint as you said.
>> That is needed later when I need to make another table like
>>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>> Without the type information hint, I've got an error
>>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>> Please specify the type of the input with a RowTypeInfo."*
>> That's why I give a type information hint in that way.
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> Can you provide a bit more information:
>>> which Flink version are you using?
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>> only adds a type information hint about the return type of this operator.
>>> It is used in cases where Flink cannot determine automatically[1].
>>>
>>> Thanks,
>>> Rong
>>>
>>> --
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>
>>>
>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Consider the following snippet:
>>>>
>>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>> Row.class)
>>>>>
>>>>> *      .map(a -> a)
>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>     stream.print();
>>>>>
>>>> where sourceTable.printSchema() shows:
>>>>
>>>>> root
>>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>
>>>>
>>>>
>>>>  This program returns the following exception:
>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>> at
>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>>> cast to java.lang.Long*
>>>>> * at
>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>> ...
>>>>
>>>>
>>>> The row serializer seems to try to deep-copy an instance of
>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>> Could anybody help me?
>>>>
>>>> Best,
>>>>
>>>> - Dongwon
>>>>
>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>> that as I want to convert DataStream<Row> into another table later.
>>>> p.s. the source table is created as follows:
>>>>
>>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv)
>>>>> {
>>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>       .version("universal")
>>>>>       .topic("mytopic")
>>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>>       .property("group.id", "mygroup")
>>>>>       .startFromEarliest();
>>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>>       .deriveSchema()
>>>>>       .ignoreParseErrors()
>>>>>       .fieldDelimiter(',');
>>>>>     Schema schemaDescriptor = new Schema()
>>>>>       .field("time1", SQL_TIMESTAMP())
>>>>>       .rowtime(
>>>>>         new Rowtime()
>>>>>           .timestampsFromField("rowTime")
>>>>>           .watermarksPeriodicBounded(100)
>>>>>       );
>>>>>     tEnv.connect(connectorDescriptor)
>>>>>       .withFormat(formatDescriptor)
>>>>>       .withSchema(schemaDescriptor)
>>>>>       .inAppendMode()
>>>>>       .registerTableSource("mysrc");
>>>>>     return tEnv.scan("mysrc");
>>>>>   }
>>>>
>>>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,

I have to dig deeper into the code to reproduce this error. This seems to
be a bug to me and will update once I find anything.

However from what you explained, if I understand correctly you can do all
of your processing within the TableAPI scope without converting it back and
forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function
that's simple enough, you can implement and connect with the table API via
UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much
cleaner implementation from my perspective.

--
Rong

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html


On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>
> yup :-)
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>
> The reason why I specify
> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
> information hint as you said.
> That is needed later when I need to make another table like
>    "*Table anotherTable = tEnv.fromDataStream(stream);"*,
> Without the type information hint, I've got an error
>    "*An input of GenericTypeInfo<Row> cannot be converted to Table.
> Please specify the type of the input with a RowTypeInfo."*
> That's why I give a type information hint in that way.
>
> Best,
>
> Dongwon
>
> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> Can you provide a bit more information:
>> which Flink version are you using?
>> what is the "sourceTable.getSchema().toRowType()" return?
>> what is the line *".map(a -> a)" *do and can you remove it?
>> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>>
>> Thanks,
>> Rong
>>
>> --
>> [1]
>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>
>>
>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Consider the following snippet:
>>>
>>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>>
>>>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>>>     stream.print();
>>>>
>>> where sourceTable.printSchema() shows:
>>>
>>>> root
>>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>
>>>
>>>
>>>  This program returns the following exception:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>> at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>> cast to java.lang.Long*
>>>> * at
>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>> ...
>>>
>>>
>>> The row serializer seems to try to deep-copy an instance of
>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>> Could anybody help me?
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>> p.s. though removing .returns() makes everything okay, I need to do that
>>> as I want to convert DataStream<Row> into another table later.
>>> p.s. the source table is created as follows:
>>>
>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>       .version("universal")
>>>>       .topic("mytopic")
>>>>       .property("bootstrap.servers", "localhost:9092")
>>>>       .property("group.id", "mygroup")
>>>>       .startFromEarliest();
>>>>     FormatDescriptor formatDescriptor = new Csv()
>>>>       .deriveSchema()
>>>>       .ignoreParseErrors()
>>>>       .fieldDelimiter(',');
>>>>     Schema schemaDescriptor = new Schema()
>>>>       .field("time1", SQL_TIMESTAMP())
>>>>       .rowtime(
>>>>         new Rowtime()
>>>>           .timestampsFromField("rowTime")
>>>>           .watermarksPeriodicBounded(100)
>>>>       );
>>>>     tEnv.connect(connectorDescriptor)
>>>>       .withFormat(formatDescriptor)
>>>>       .withSchema(schemaDescriptor)
>>>>       .inAppendMode()
>>>>       .registerTableSource("mysrc");
>>>>     return tEnv.scan("mysrc");
>>>>   }
>>>
>>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Rong,

Thank you for reply :-)

which Flink version are you using?

I'm using Flink-1.8.0.

what is the "sourceTable.getSchema().toRowType()" return?

Row(time1: TimeIndicatorTypeInfo(rowtime))

what is the line *".map(a -> a)" *do and can you remove it?

*".map(a->a)"* is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the
snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert
table to datastream and vice versa.

if I am understanding correctly, you are also using "time1" as the rowtime,
> is that want your intension is to use it later as well?

yup :-)

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].

The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is
to give a type information hint as you said.
That is needed later when I need to make another table like
   "*Table anotherTable = tEnv.fromDataStream(stream);"*,
Without the type information hint, I've got an error
   "*An input of GenericTypeInfo<Row> cannot be converted to Table. Please
specify the type of the input with a RowTypeInfo."*
That's why I give a type information hint in that way.

Best,

Dongwon

On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Dongwon,
>
> Can you provide a bit more information:
> which Flink version are you using?
> what is the "sourceTable.getSchema().toRowType()" return?
> what is the line *".map(a -> a)" *do and can you remove it?
> if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].
>
> Thanks,
> Rong
>
> --
> [1]
> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>
>
> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hello,
>>
>> Consider the following snippet:
>>
>>>     Table sourceTable = getKafkaSource0(tEnv);
>>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>
>>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>>     stream.print();
>>>
>> where sourceTable.printSchema() shows:
>>
>>> root
>>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>>
>>
>>
>>  This program returns the following exception:
>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>> at
>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>> at app.metatron.test.Main2.main(Main2.java:231)
>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>> cast to java.lang.Long*
>>> * at
>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>> ...
>>
>>
>> The row serializer seems to try to deep-copy an instance of
>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>> Could anybody help me?
>>
>> Best,
>>
>> - Dongwon
>>
>> p.s. though removing .returns() makes everything okay, I need to do that
>> as I want to convert DataStream<Row> into another table later.
>> p.s. the source table is created as follows:
>>
>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>>       .version("universal")
>>>       .topic("mytopic")
>>>       .property("bootstrap.servers", "localhost:9092")
>>>       .property("group.id", "mygroup")
>>>       .startFromEarliest();
>>>     FormatDescriptor formatDescriptor = new Csv()
>>>       .deriveSchema()
>>>       .ignoreParseErrors()
>>>       .fieldDelimiter(',');
>>>     Schema schemaDescriptor = new Schema()
>>>       .field("time1", SQL_TIMESTAMP())
>>>       .rowtime(
>>>         new Rowtime()
>>>           .timestampsFromField("rowTime")
>>>           .watermarksPeriodicBounded(100)
>>>       );
>>>     tEnv.connect(connectorDescriptor)
>>>       .withFormat(formatDescriptor)
>>>       .withSchema(schemaDescriptor)
>>>       .inAppendMode()
>>>       .registerTableSource("mysrc");
>>>     return tEnv.scan("mysrc");
>>>   }
>>
>>

Re: [Table API] ClassCastException when converting a table to DataStream

Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,

Can you provide a bit more information:
which Flink version are you using?
what is the "sourceTable.getSchema().toRowType()" return?
what is the line *".map(a -> a)" *do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime,
is that want your intension is to use it later as well?

As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
adds a type information hint about the return type of this operator. It is
used in cases where Flink cannot determine automatically[1].

Thanks,
Rong

--
[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351


On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com> wrote:

> Hello,
>
> Consider the following snippet:
>
>>     Table sourceTable = getKafkaSource0(tEnv);
>>     DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>
>> *      .map(a -> a)      .returns(sourceTable.getSchema().toRowType());*
>>     stream.print();
>>
> where sourceTable.printSchema() shows:
>
>> root
>>  |-- time1: TimeIndicatorTypeInfo(rowtime)
>
>
>
>  This program returns the following exception:
>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at app.metatron.test.Main2.main(Main2.java:231)
>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>> cast to java.lang.Long*
>> * at
>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>> ...
>
>
> The row serializer seems to try to deep-copy an instance of
> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
> Could anybody help me?
>
> Best,
>
> - Dongwon
>
> p.s. though removing .returns() makes everything okay, I need to do that
> as I want to convert DataStream<Row> into another table later.
> p.s. the source table is created as follows:
>
> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>     ConnectorDescriptor connectorDescriptor = new Kafka()
>>       .version("universal")
>>       .topic("mytopic")
>>       .property("bootstrap.servers", "localhost:9092")
>>       .property("group.id", "mygroup")
>>       .startFromEarliest();
>>     FormatDescriptor formatDescriptor = new Csv()
>>       .deriveSchema()
>>       .ignoreParseErrors()
>>       .fieldDelimiter(',');
>>     Schema schemaDescriptor = new Schema()
>>       .field("time1", SQL_TIMESTAMP())
>>       .rowtime(
>>         new Rowtime()
>>           .timestampsFromField("rowTime")
>>           .watermarksPeriodicBounded(100)
>>       );
>>     tEnv.connect(connectorDescriptor)
>>       .withFormat(formatDescriptor)
>>       .withSchema(schemaDescriptor)
>>       .inAppendMode()
>>       .registerTableSource("mysrc");
>>     return tEnv.scan("mysrc");
>>   }
>
>