You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2019/07/17 08:29:00 UTC
[Table API] ClassCastException when converting a table to DataStream|
Hello,
Consider the following snippet:
> Table sourceTable = getKafkaSource0(tEnv);
> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>
> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());*
> stream.print();
>
where sourceTable.printSchema() shows:
> root
> |-- time1: TimeIndicatorTypeInfo(rowtime)
This program returns the following exception:
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
> at
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
> at app.metatron.test.Main2.main(Main2.java:231)
> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
> cast to java.lang.Long*
> * at
> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
> ...
The row serializer seems to try to deep-copy an instance of
java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
Could anybody help me?
Best,
- Dongwon
p.s. though removing .returns() makes everything okay, I need to do that as
I want to convert DataStream<Row> into another table later.
p.s. the source table is created as follows:
private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
> ConnectorDescriptor connectorDescriptor = new Kafka()
> .version("universal")
> .topic("mytopic")
> .property("bootstrap.servers", "localhost:9092")
> .property("group.id", "mygroup")
> .startFromEarliest();
> FormatDescriptor formatDescriptor = new Csv()
> .deriveSchema()
> .ignoreParseErrors()
> .fieldDelimiter(',');
> Schema schemaDescriptor = new Schema()
> .field("time1", SQL_TIMESTAMP())
> .rowtime(
> new Rowtime()
> .timestampsFromField("rowTime")
> .watermarksPeriodicBounded(100)
> );
> tEnv.connect(connectorDescriptor)
> .withFormat(formatDescriptor)
> .withSchema(schemaDescriptor)
> .inAppendMode()
> .registerTableSource("mysrc");
> return tEnv.scan("mysrc");
> }
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,
Sorry for the late reply. I did try some experiment and seems like you are
right:
Setting the `.return()` type actually alter the underlying type of the
DataStream from a GenericType into a specific RowTypeInfo. Please see the
JIRA ticket [1] for more info.
Regarding the approach, yes I think you cannot access the timer service
from the table/SQL API at this moment so that might be the best approach.
And as Fabian suggested, I don't think there's too much problem if you are
not changing the type info underlying in your DataStream. I will follow up
with this in the JIRA ticket.
--
Rong
[1] https://issues.apache.org/jira/browse/FLINK-13389
On Tue, Jul 23, 2019 at 6:30 AM Dongwon Kim <ea...@gmail.com> wrote:
> Hi Fabian,
>
> Thanks for clarification :-)
> I could convert back and forth without worrying about it as I keep using
> Row type during the conversion (even though fields are added).
>
> Best,
>
> Dongwon
>
>
>
> On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> regarding the question about the conversion: If you keep using the Row
>> type and not adding/removing fields, the conversion is pretty much for free
>> right now.
>> It will be a MapFunction (sometimes even not function at all) that should
>> be chained with the other operators. Hence, it should boil down to a
>> function call.
>>
>> Best, Fabian
>>
>> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
>> eastcirclek@gmail.com>:
>>
>>> Hi Rong,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>
>>> Thanks a lot for spending your time on this.
>>>
>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>
>>> I also agree with you in that the first class citizen Table API will
>>> make everything not only easier but also a lot cleaner.
>>> We however contain some corner cases that force us to covert Table from
>>> and to DataStream.
>>> One such case is to append to Table a column showing the current
>>> watermark of each record; there's no other way but to do that as
>>> ScalarFunction doesn't allow us to get the runtime context information as
>>> ProcessFunction does.
>>>
>>> I have a question regarding the conversion.
>>> Do I have to worry about runtime performance penalty in case that I
>>> cannot help but convert back and fourth to DataStream?
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> I have to dig deeper into the code to reproduce this error. This seems
>>>> to be a bug to me and will update once I find anything.
>>>>
>>>> However from what you explained, if I understand correctly you can do
>>>> all of your processing within the TableAPI scope without converting it back
>>>> and forth to DataStream.
>>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>>> function that's simple enough, you can implement and connect with the table
>>>> API via UserDefinedFunction[1].
>>>> As TableAPI becoming the first class citizen [2,3,4], this would be
>>>> much cleaner implementation from my perspective.
>>>>
>>>> --
>>>> Rong
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>>> [2]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>>> [3]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>>> [4]
>>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>>
>>>>
>>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Rong,
>>>>>
>>>>> Thank you for reply :-)
>>>>>
>>>>> which Flink version are you using?
>>>>>
>>>>> I'm using Flink-1.8.0.
>>>>>
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>
>>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>>
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>
>>>>> *".map(a->a)"* is just to illustrate a problem.
>>>>> My actual code contains a process function (instead of .map() in the
>>>>> snippet) which appends a new field containing watermark to a row.
>>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>>> convert table to datastream and vice versa.
>>>>>
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> yup :-)
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>>> adds a type information hint about the return type of this operator. It is
>>>>>> used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> The reason why I specify
>>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>>> information hint as you said.
>>>>> That is needed later when I need to make another table like
>>>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>>> Without the type information hint, I've got an error
>>>>> "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>>> Please specify the type of the input with a RowTypeInfo."*
>>>>> That's why I give a type information hint in that way.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dongwon
>>>>>
>>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Dongwon,
>>>>>>
>>>>>> Can you provide a bit more information:
>>>>>> which Flink version are you using?
>>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>>
>>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>>> only adds a type information hint about the return type of this operator.
>>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>>
>>>>>> Thanks,
>>>>>> Rong
>>>>>>
>>>>>> --
>>>>>> [1]
>>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>>
>>>>>>
>>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hello,
>>>>>>>
>>>>>>> Consider the following snippet:
>>>>>>>
>>>>>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>>> Row.class)
>>>>>>>>
>>>>>>>> * .map(a -> a)
>>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>>> stream.print();
>>>>>>>>
>>>>>>> where sourceTable.printSchema() shows:
>>>>>>>
>>>>>>>> root
>>>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> This program returns the following exception:
>>>>>>>
>>>>>>>> Exception in thread "main"
>>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>>> at
>>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>>> at
>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>>>> be cast to java.lang.Long*
>>>>>>>> * at
>>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>>> at
>>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>>> ...
>>>>>>>
>>>>>>>
>>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>>>> Could anybody help me?
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> - Dongwon
>>>>>>>
>>>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>>>> p.s. the source table is created as follows:
>>>>>>>
>>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>>> tEnv) {
>>>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>>> .version("universal")
>>>>>>>> .topic("mytopic")
>>>>>>>> .property("bootstrap.servers", "localhost:9092")
>>>>>>>> .property("group.id", "mygroup")
>>>>>>>> .startFromEarliest();
>>>>>>>> FormatDescriptor formatDescriptor = new Csv()
>>>>>>>> .deriveSchema()
>>>>>>>> .ignoreParseErrors()
>>>>>>>> .fieldDelimiter(',');
>>>>>>>> Schema schemaDescriptor = new Schema()
>>>>>>>> .field("time1", SQL_TIMESTAMP())
>>>>>>>> .rowtime(
>>>>>>>> new Rowtime()
>>>>>>>> .timestampsFromField("rowTime")
>>>>>>>> .watermarksPeriodicBounded(100)
>>>>>>>> );
>>>>>>>> tEnv.connect(connectorDescriptor)
>>>>>>>> .withFormat(formatDescriptor)
>>>>>>>> .withSchema(schemaDescriptor)
>>>>>>>> .inAppendMode()
>>>>>>>> .registerTableSource("mysrc");
>>>>>>>> return tEnv.scan("mysrc");
>>>>>>>> }
>>>>>>>
>>>>>>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Dongwon Kim <ea...@gmail.com>.
Hi Fabian,
Thanks for clarification :-)
I could convert back and forth without worrying about it as I keep using
Row type during the conversion (even though fields are added).
Best,
Dongwon
On Tue, Jul 23, 2019 at 8:15 PM Fabian Hueske <fh...@gmail.com> wrote:
> Hi Dongwon,
>
> regarding the question about the conversion: If you keep using the Row
> type and not adding/removing fields, the conversion is pretty much for free
> right now.
> It will be a MapFunction (sometimes even not function at all) that should
> be chained with the other operators. Hence, it should boil down to a
> function call.
>
> Best, Fabian
>
> Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
> eastcirclek@gmail.com>:
>
>> Hi Rong,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>>> be a bug to me and will update once I find anything.
>>
>> Thanks a lot for spending your time on this.
>>
>> However from what you explained, if I understand correctly you can do all
>>> of your processing within the TableAPI scope without converting it back and
>>> forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>
>> I also agree with you in that the first class citizen Table API will make
>> everything not only easier but also a lot cleaner.
>> We however contain some corner cases that force us to covert Table from
>> and to DataStream.
>> One such case is to append to Table a column showing the current
>> watermark of each record; there's no other way but to do that as
>> ScalarFunction doesn't allow us to get the runtime context information as
>> ProcessFunction does.
>>
>> I have a question regarding the conversion.
>> Do I have to worry about runtime performance penalty in case that I
>> cannot help but convert back and fourth to DataStream?
>>
>> Best,
>>
>> Dongwon
>>
>> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> I have to dig deeper into the code to reproduce this error. This seems
>>> to be a bug to me and will update once I find anything.
>>>
>>> However from what you explained, if I understand correctly you can do
>>> all of your processing within the TableAPI scope without converting it back
>>> and forth to DataStream.
>>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>>> function that's simple enough, you can implement and connect with the table
>>> API via UserDefinedFunction[1].
>>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>>> cleaner implementation from my perspective.
>>>
>>> --
>>> Rong
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>>> [2]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>>> [3]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>>> [4]
>>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>>
>>>
>>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hi Rong,
>>>>
>>>> Thank you for reply :-)
>>>>
>>>> which Flink version are you using?
>>>>
>>>> I'm using Flink-1.8.0.
>>>>
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>
>>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>>
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>
>>>> *".map(a->a)"* is just to illustrate a problem.
>>>> My actual code contains a process function (instead of .map() in the
>>>> snippet) which appends a new field containing watermark to a row.
>>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>>> convert table to datastream and vice versa.
>>>>
>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> yup :-)
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>>> adds a type information hint about the return type of this operator. It is
>>>>> used in cases where Flink cannot determine automatically[1].
>>>>
>>>> The reason why I specify
>>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>>> information hint as you said.
>>>> That is needed later when I need to make another table like
>>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>>> Without the type information hint, I've got an error
>>>> "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>>> Please specify the type of the input with a RowTypeInfo."*
>>>> That's why I give a type information hint in that way.
>>>>
>>>> Best,
>>>>
>>>> Dongwon
>>>>
>>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>>>
>>>>> Hi Dongwon,
>>>>>
>>>>> Can you provide a bit more information:
>>>>> which Flink version are you using?
>>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>>> if I am understanding correctly, you are also using "time1" as the
>>>>> rowtime, is that want your intension is to use it later as well?
>>>>>
>>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>>> only adds a type information hint about the return type of this operator.
>>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>>
>>>>> Thanks,
>>>>> Rong
>>>>>
>>>>> --
>>>>> [1]
>>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>>
>>>>>
>>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> Consider the following snippet:
>>>>>>
>>>>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>>> Row.class)
>>>>>>>
>>>>>>> * .map(a -> a)
>>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>>> stream.print();
>>>>>>>
>>>>>> where sourceTable.printSchema() shows:
>>>>>>
>>>>>>> root
>>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>>
>>>>>>
>>>>>>
>>>>>> This program returns the following exception:
>>>>>>
>>>>>>> Exception in thread "main"
>>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>>> at
>>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>>> at
>>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>>> at
>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>>> be cast to java.lang.Long*
>>>>>>> * at
>>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>>> at
>>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>>> ...
>>>>>>
>>>>>>
>>>>>> The row serializer seems to try to deep-copy an instance of
>>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>>> Could anybody help me?
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> - Dongwon
>>>>>>
>>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>>> p.s. the source table is created as follows:
>>>>>>
>>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>>> tEnv) {
>>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>>> .version("universal")
>>>>>>> .topic("mytopic")
>>>>>>> .property("bootstrap.servers", "localhost:9092")
>>>>>>> .property("group.id", "mygroup")
>>>>>>> .startFromEarliest();
>>>>>>> FormatDescriptor formatDescriptor = new Csv()
>>>>>>> .deriveSchema()
>>>>>>> .ignoreParseErrors()
>>>>>>> .fieldDelimiter(',');
>>>>>>> Schema schemaDescriptor = new Schema()
>>>>>>> .field("time1", SQL_TIMESTAMP())
>>>>>>> .rowtime(
>>>>>>> new Rowtime()
>>>>>>> .timestampsFromField("rowTime")
>>>>>>> .watermarksPeriodicBounded(100)
>>>>>>> );
>>>>>>> tEnv.connect(connectorDescriptor)
>>>>>>> .withFormat(formatDescriptor)
>>>>>>> .withSchema(schemaDescriptor)
>>>>>>> .inAppendMode()
>>>>>>> .registerTableSource("mysrc");
>>>>>>> return tEnv.scan("mysrc");
>>>>>>> }
>>>>>>
>>>>>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Dongwon,
regarding the question about the conversion: If you keep using the Row type
and not adding/removing fields, the conversion is pretty much for free
right now.
It will be a MapFunction (sometimes even not function at all) that should
be chained with the other operators. Hence, it should boil down to a
function call.
Best, Fabian
Am Sa., 20. Juli 2019 um 03:58 Uhr schrieb Dongwon Kim <
eastcirclek@gmail.com>:
> Hi Rong,
>
> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>
> Thanks a lot for spending your time on this.
>
> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>
> I also agree with you in that the first class citizen Table API will make
> everything not only easier but also a lot cleaner.
> We however contain some corner cases that force us to covert Table from
> and to DataStream.
> One such case is to append to Table a column showing the current watermark
> of each record; there's no other way but to do that as ScalarFunction
> doesn't allow us to get the runtime context information as ProcessFunction
> does.
>
> I have a question regarding the conversion.
> Do I have to worry about runtime performance penalty in case that I cannot
> help but convert back and fourth to DataStream?
>
> Best,
>
> Dongwon
>
> On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> I have to dig deeper into the code to reproduce this error. This seems to
>> be a bug to me and will update once I find anything.
>>
>> However from what you explained, if I understand correctly you can do all
>> of your processing within the TableAPI scope without converting it back and
>> forth to DataStream.
>> E.g. if your "map(a -> a)" placeholder represents some sort of map
>> function that's simple enough, you can implement and connect with the table
>> API via UserDefinedFunction[1].
>> As TableAPI becoming the first class citizen [2,3,4], this would be much
>> cleaner implementation from my perspective.
>>
>> --
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
>> [2]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
>> [3]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
>> [4]
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>>
>>
>> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hi Rong,
>>>
>>> Thank you for reply :-)
>>>
>>> which Flink version are you using?
>>>
>>> I'm using Flink-1.8.0.
>>>
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>
>>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>>
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>
>>> *".map(a->a)"* is just to illustrate a problem.
>>> My actual code contains a process function (instead of .map() in the
>>> snippet) which appends a new field containing watermark to a row.
>>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>>> convert table to datastream and vice versa.
>>>
>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> yup :-)
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>>> adds a type information hint about the return type of this operator. It is
>>>> used in cases where Flink cannot determine automatically[1].
>>>
>>> The reason why I specify
>>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>>> information hint as you said.
>>> That is needed later when I need to make another table like
>>> "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>>> Without the type information hint, I've got an error
>>> "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>>> Please specify the type of the input with a RowTypeInfo."*
>>> That's why I give a type information hint in that way.
>>>
>>> Best,
>>>
>>> Dongwon
>>>
>>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>>
>>>> Hi Dongwon,
>>>>
>>>> Can you provide a bit more information:
>>>> which Flink version are you using?
>>>> what is the "sourceTable.getSchema().toRowType()" return?
>>>> what is the line *".map(a -> a)" *do and can you remove it?
>>>> if I am understanding correctly, you are also using "time1" as the
>>>> rowtime, is that want your intension is to use it later as well?
>>>>
>>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>>> only adds a type information hint about the return type of this operator.
>>>> It is used in cases where Flink cannot determine automatically[1].
>>>>
>>>> Thanks,
>>>> Rong
>>>>
>>>> --
>>>> [1]
>>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>>
>>>>
>>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> Consider the following snippet:
>>>>>
>>>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>>> Row.class)
>>>>>>
>>>>>> * .map(a -> a)
>>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>>> stream.print();
>>>>>>
>>>>> where sourceTable.printSchema() shows:
>>>>>
>>>>>> root
>>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>>
>>>>>
>>>>>
>>>>> This program returns the following exception:
>>>>>
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>>> at
>>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>>> at
>>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot
>>>>>> be cast to java.lang.Long*
>>>>>> * at
>>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>>> at
>>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>>> ...
>>>>>
>>>>>
>>>>> The row serializer seems to try to deep-copy an instance of
>>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>>> Could anybody help me?
>>>>>
>>>>> Best,
>>>>>
>>>>> - Dongwon
>>>>>
>>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>>> that as I want to convert DataStream<Row> into another table later.
>>>>> p.s. the source table is created as follows:
>>>>>
>>>>> private static final Table getKafkaSource0(StreamTableEnvironment
>>>>>> tEnv) {
>>>>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>>> .version("universal")
>>>>>> .topic("mytopic")
>>>>>> .property("bootstrap.servers", "localhost:9092")
>>>>>> .property("group.id", "mygroup")
>>>>>> .startFromEarliest();
>>>>>> FormatDescriptor formatDescriptor = new Csv()
>>>>>> .deriveSchema()
>>>>>> .ignoreParseErrors()
>>>>>> .fieldDelimiter(',');
>>>>>> Schema schemaDescriptor = new Schema()
>>>>>> .field("time1", SQL_TIMESTAMP())
>>>>>> .rowtime(
>>>>>> new Rowtime()
>>>>>> .timestampsFromField("rowTime")
>>>>>> .watermarksPeriodicBounded(100)
>>>>>> );
>>>>>> tEnv.connect(connectorDescriptor)
>>>>>> .withFormat(formatDescriptor)
>>>>>> .withSchema(schemaDescriptor)
>>>>>> .inAppendMode()
>>>>>> .registerTableSource("mysrc");
>>>>>> return tEnv.scan("mysrc");
>>>>>> }
>>>>>
>>>>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Dongwon Kim <ea...@gmail.com>.
Hi Rong,
I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.
Thanks a lot for spending your time on this.
However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.
I also agree with you in that the first class citizen Table API will make
everything not only easier but also a lot cleaner.
We however contain some corner cases that force us to covert Table from and
to DataStream.
One such case is to append to Table a column showing the current watermark
of each record; there's no other way but to do that as ScalarFunction
doesn't allow us to get the runtime context information as ProcessFunction
does.
I have a question regarding the conversion.
Do I have to worry about runtime performance penalty in case that I cannot
help but convert back and fourth to DataStream?
Best,
Dongwon
On Sat, Jul 20, 2019 at 12:41 AM Rong Rong <wa...@gmail.com> wrote:
> Hi Dongwon,
>
> I have to dig deeper into the code to reproduce this error. This seems to
> be a bug to me and will update once I find anything.
>
> However from what you explained, if I understand correctly you can do all
> of your processing within the TableAPI scope without converting it back and
> forth to DataStream.
> E.g. if your "map(a -> a)" placeholder represents some sort of map
> function that's simple enough, you can implement and connect with the table
> API via UserDefinedFunction[1].
> As TableAPI becoming the first class citizen [2,3,4], this would be much
> cleaner implementation from my perspective.
>
> --
> Rong
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
> [2]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
> [3]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
> [4]
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
>
>
> On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Rong,
>>
>> Thank you for reply :-)
>>
>> which Flink version are you using?
>>
>> I'm using Flink-1.8.0.
>>
>> what is the "sourceTable.getSchema().toRowType()" return?
>>
>> Row(time1: TimeIndicatorTypeInfo(rowtime))
>>
>> what is the line *".map(a -> a)" *do and can you remove it?
>>
>> *".map(a->a)"* is just to illustrate a problem.
>> My actual code contains a process function (instead of .map() in the
>> snippet) which appends a new field containing watermark to a row.
>> If there were ways to get watermark inside a scalar UDF, I wouldn't
>> convert table to datastream and vice versa.
>>
>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>
>> yup :-)
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>>> adds a type information hint about the return type of this operator. It is
>>> used in cases where Flink cannot determine automatically[1].
>>
>> The reason why I specify
>> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
>> information hint as you said.
>> That is needed later when I need to make another table like
>> "*Table anotherTable = tEnv.fromDataStream(stream);"*,
>> Without the type information hint, I've got an error
>> "*An input of GenericTypeInfo<Row> cannot be converted to Table.
>> Please specify the type of the input with a RowTypeInfo."*
>> That's why I give a type information hint in that way.
>>
>> Best,
>>
>> Dongwon
>>
>> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>>
>>> Hi Dongwon,
>>>
>>> Can you provide a bit more information:
>>> which Flink version are you using?
>>> what is the "sourceTable.getSchema().toRowType()" return?
>>> what is the line *".map(a -> a)" *do and can you remove it?
>>> if I am understanding correctly, you are also using "time1" as the
>>> rowtime, is that want your intension is to use it later as well?
>>>
>>> As far as I know *".returns(sourceTable.getSchema().toRowType());"*
>>> only adds a type information hint about the return type of this operator.
>>> It is used in cases where Flink cannot determine automatically[1].
>>>
>>> Thanks,
>>> Rong
>>>
>>> --
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>>
>>>
>>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Consider the following snippet:
>>>>
>>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable,
>>>>> Row.class)
>>>>>
>>>>> * .map(a -> a)
>>>>> .returns(sourceTable.getSchema().toRowType());*
>>>>> stream.print();
>>>>>
>>>> where sourceTable.printSchema() shows:
>>>>
>>>>> root
>>>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>>
>>>>
>>>>
>>>> This program returns the following exception:
>>>>
>>>>> Exception in thread "main"
>>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>>> at
>>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>>> at
>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>>> cast to java.lang.Long*
>>>>> * at
>>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>>> at
>>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>>> ...
>>>>
>>>>
>>>> The row serializer seems to try to deep-copy an instance of
>>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>>> Could anybody help me?
>>>>
>>>> Best,
>>>>
>>>> - Dongwon
>>>>
>>>> p.s. though removing .returns() makes everything okay, I need to do
>>>> that as I want to convert DataStream<Row> into another table later.
>>>> p.s. the source table is created as follows:
>>>>
>>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv)
>>>>> {
>>>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>>> .version("universal")
>>>>> .topic("mytopic")
>>>>> .property("bootstrap.servers", "localhost:9092")
>>>>> .property("group.id", "mygroup")
>>>>> .startFromEarliest();
>>>>> FormatDescriptor formatDescriptor = new Csv()
>>>>> .deriveSchema()
>>>>> .ignoreParseErrors()
>>>>> .fieldDelimiter(',');
>>>>> Schema schemaDescriptor = new Schema()
>>>>> .field("time1", SQL_TIMESTAMP())
>>>>> .rowtime(
>>>>> new Rowtime()
>>>>> .timestampsFromField("rowTime")
>>>>> .watermarksPeriodicBounded(100)
>>>>> );
>>>>> tEnv.connect(connectorDescriptor)
>>>>> .withFormat(formatDescriptor)
>>>>> .withSchema(schemaDescriptor)
>>>>> .inAppendMode()
>>>>> .registerTableSource("mysrc");
>>>>> return tEnv.scan("mysrc");
>>>>> }
>>>>
>>>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,
I have to dig deeper into the code to reproduce this error. This seems to
be a bug to me and will update once I find anything.
However from what you explained, if I understand correctly you can do all
of your processing within the TableAPI scope without converting it back and
forth to DataStream.
E.g. if your "map(a -> a)" placeholder represents some sort of map function
that's simple enough, you can implement and connect with the table API via
UserDefinedFunction[1].
As TableAPI becoming the first class citizen [2,3,4], this would be much
cleaner implementation from my perspective.
--
Rong
[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/udfs.html#scalar-functions
[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Table-API-Enhancement-Outline-td25070.html
[3]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-FLIP-36-Support-Interactive-Programming-in-Flink-Table-API-td27658.html
[4]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html
On Thu, Jul 18, 2019 at 8:42 PM Dongwon Kim <ea...@gmail.com> wrote:
> Hi Rong,
>
> Thank you for reply :-)
>
> which Flink version are you using?
>
> I'm using Flink-1.8.0.
>
> what is the "sourceTable.getSchema().toRowType()" return?
>
> Row(time1: TimeIndicatorTypeInfo(rowtime))
>
> what is the line *".map(a -> a)" *do and can you remove it?
>
> *".map(a->a)"* is just to illustrate a problem.
> My actual code contains a process function (instead of .map() in the
> snippet) which appends a new field containing watermark to a row.
> If there were ways to get watermark inside a scalar UDF, I wouldn't
> convert table to datastream and vice versa.
>
> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>
> yup :-)
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>
> The reason why I specify
> *".returns(sourceTable.getSchema().toRowType());"* is to give a type
> information hint as you said.
> That is needed later when I need to make another table like
> "*Table anotherTable = tEnv.fromDataStream(stream);"*,
> Without the type information hint, I've got an error
> "*An input of GenericTypeInfo<Row> cannot be converted to Table.
> Please specify the type of the input with a RowTypeInfo."*
> That's why I give a type information hint in that way.
>
> Best,
>
> Dongwon
>
> On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
>
>> Hi Dongwon,
>>
>> Can you provide a bit more information:
>> which Flink version are you using?
>> what is the "sourceTable.getSchema().toRowType()" return?
>> what is the line *".map(a -> a)" *do and can you remove it?
>> if I am understanding correctly, you are also using "time1" as the
>> rowtime, is that want your intension is to use it later as well?
>>
>> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
>> adds a type information hint about the return type of this operator. It is
>> used in cases where Flink cannot determine automatically[1].
>>
>> Thanks,
>> Rong
>>
>> --
>> [1]
>> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>>
>>
>> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com>
>> wrote:
>>
>>> Hello,
>>>
>>> Consider the following snippet:
>>>
>>>> Table sourceTable = getKafkaSource0(tEnv);
>>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>>
>>>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());*
>>>> stream.print();
>>>>
>>> where sourceTable.printSchema() shows:
>>>
>>>> root
>>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>>
>>>
>>>
>>> This program returns the following exception:
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>>> at
>>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>>> at app.metatron.test.Main2.main(Main2.java:231)
>>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>>> cast to java.lang.Long*
>>>> * at
>>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>>> at
>>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>>> ...
>>>
>>>
>>> The row serializer seems to try to deep-copy an instance of
>>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>>> Could anybody help me?
>>>
>>> Best,
>>>
>>> - Dongwon
>>>
>>> p.s. though removing .returns() makes everything okay, I need to do that
>>> as I want to convert DataStream<Row> into another table later.
>>> p.s. the source table is created as follows:
>>>
>>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>>> .version("universal")
>>>> .topic("mytopic")
>>>> .property("bootstrap.servers", "localhost:9092")
>>>> .property("group.id", "mygroup")
>>>> .startFromEarliest();
>>>> FormatDescriptor formatDescriptor = new Csv()
>>>> .deriveSchema()
>>>> .ignoreParseErrors()
>>>> .fieldDelimiter(',');
>>>> Schema schemaDescriptor = new Schema()
>>>> .field("time1", SQL_TIMESTAMP())
>>>> .rowtime(
>>>> new Rowtime()
>>>> .timestampsFromField("rowTime")
>>>> .watermarksPeriodicBounded(100)
>>>> );
>>>> tEnv.connect(connectorDescriptor)
>>>> .withFormat(formatDescriptor)
>>>> .withSchema(schemaDescriptor)
>>>> .inAppendMode()
>>>> .registerTableSource("mysrc");
>>>> return tEnv.scan("mysrc");
>>>> }
>>>
>>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Dongwon Kim <ea...@gmail.com>.
Hi Rong,
Thank you for reply :-)
which Flink version are you using?
I'm using Flink-1.8.0.
what is the "sourceTable.getSchema().toRowType()" return?
Row(time1: TimeIndicatorTypeInfo(rowtime))
what is the line *".map(a -> a)" *do and can you remove it?
*".map(a->a)"* is just to illustrate a problem.
My actual code contains a process function (instead of .map() in the
snippet) which appends a new field containing watermark to a row.
If there were ways to get watermark inside a scalar UDF, I wouldn't convert
table to datastream and vice versa.
if I am understanding correctly, you are also using "time1" as the rowtime,
> is that want your intension is to use it later as well?
yup :-)
As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].
The reason why I specify *".returns(sourceTable.getSchema().toRowType());"* is
to give a type information hint as you said.
That is needed later when I need to make another table like
"*Table anotherTable = tEnv.fromDataStream(stream);"*,
Without the type information hint, I've got an error
"*An input of GenericTypeInfo<Row> cannot be converted to Table. Please
specify the type of the input with a RowTypeInfo."*
That's why I give a type information hint in that way.
Best,
Dongwon
On Fri, Jul 19, 2019 at 12:39 AM Rong Rong <wa...@gmail.com> wrote:
> Hi Dongwon,
>
> Can you provide a bit more information:
> which Flink version are you using?
> what is the "sourceTable.getSchema().toRowType()" return?
> what is the line *".map(a -> a)" *do and can you remove it?
> if I am understanding correctly, you are also using "time1" as the
> rowtime, is that want your intension is to use it later as well?
>
> As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
> adds a type information hint about the return type of this operator. It is
> used in cases where Flink cannot determine automatically[1].
>
> Thanks,
> Rong
>
> --
> [1]
> https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
>
>
> On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hello,
>>
>> Consider the following snippet:
>>
>>> Table sourceTable = getKafkaSource0(tEnv);
>>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>>
>>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());*
>>> stream.print();
>>>
>> where sourceTable.printSchema() shows:
>>
>>> root
>>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>>
>>
>>
>> This program returns the following exception:
>>
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>>> at
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>>> at
>>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>>> at
>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>>> at
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>>> at app.metatron.test.Main2.main(Main2.java:231)
>>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>>> cast to java.lang.Long*
>>> * at
>>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>>> at
>>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>>> ...
>>
>>
>> The row serializer seems to try to deep-copy an instance of
>> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
>> Could anybody help me?
>>
>> Best,
>>
>> - Dongwon
>>
>> p.s. though removing .returns() makes everything okay, I need to do that
>> as I want to convert DataStream<Row> into another table later.
>> p.s. the source table is created as follows:
>>
>> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>>> ConnectorDescriptor connectorDescriptor = new Kafka()
>>> .version("universal")
>>> .topic("mytopic")
>>> .property("bootstrap.servers", "localhost:9092")
>>> .property("group.id", "mygroup")
>>> .startFromEarliest();
>>> FormatDescriptor formatDescriptor = new Csv()
>>> .deriveSchema()
>>> .ignoreParseErrors()
>>> .fieldDelimiter(',');
>>> Schema schemaDescriptor = new Schema()
>>> .field("time1", SQL_TIMESTAMP())
>>> .rowtime(
>>> new Rowtime()
>>> .timestampsFromField("rowTime")
>>> .watermarksPeriodicBounded(100)
>>> );
>>> tEnv.connect(connectorDescriptor)
>>> .withFormat(formatDescriptor)
>>> .withSchema(schemaDescriptor)
>>> .inAppendMode()
>>> .registerTableSource("mysrc");
>>> return tEnv.scan("mysrc");
>>> }
>>
>>
Re: [Table API] ClassCastException when converting a table to DataStream|
Posted by Rong Rong <wa...@gmail.com>.
Hi Dongwon,
Can you provide a bit more information:
which Flink version are you using?
what is the "sourceTable.getSchema().toRowType()" return?
what is the line *".map(a -> a)" *do and can you remove it?
if I am understanding correctly, you are also using "time1" as the rowtime,
is that want your intension is to use it later as well?
As far as I know *".returns(sourceTable.getSchema().toRowType());"* only
adds a type information hint about the return type of this operator. It is
used in cases where Flink cannot determine automatically[1].
Thanks,
Rong
--
[1]
https://github.com/apache/flink/blob/release-1.8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java#L351
On Wed, Jul 17, 2019 at 1:29 AM Dongwon Kim <ea...@gmail.com> wrote:
> Hello,
>
> Consider the following snippet:
>
>> Table sourceTable = getKafkaSource0(tEnv);
>> DataStream<Row> stream = tEnv.toAppendStream(sourceTable, Row.class)
>>
>> * .map(a -> a) .returns(sourceTable.getSchema().toRowType());*
>> stream.print();
>>
> where sourceTable.printSchema() shows:
>
>> root
>> |-- time1: TimeIndicatorTypeInfo(rowtime)
>
>
>
> This program returns the following exception:
>
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> at
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
>> at
>> org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:638)
>> at
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1509)
>> at app.metatron.test.Main2.main(Main2.java:231)
>> *Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be
>> cast to java.lang.Long*
>> * at
>> org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)*
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
>> at
>> org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
>> ...
>
>
> The row serializer seems to try to deep-copy an instance of
> java.sql.Timestamp using LongSerializer instead of SqlTimestampSerializer.
> Could anybody help me?
>
> Best,
>
> - Dongwon
>
> p.s. though removing .returns() makes everything okay, I need to do that
> as I want to convert DataStream<Row> into another table later.
> p.s. the source table is created as follows:
>
> private static final Table getKafkaSource0(StreamTableEnvironment tEnv) {
>> ConnectorDescriptor connectorDescriptor = new Kafka()
>> .version("universal")
>> .topic("mytopic")
>> .property("bootstrap.servers", "localhost:9092")
>> .property("group.id", "mygroup")
>> .startFromEarliest();
>> FormatDescriptor formatDescriptor = new Csv()
>> .deriveSchema()
>> .ignoreParseErrors()
>> .fieldDelimiter(',');
>> Schema schemaDescriptor = new Schema()
>> .field("time1", SQL_TIMESTAMP())
>> .rowtime(
>> new Rowtime()
>> .timestampsFromField("rowTime")
>> .watermarksPeriodicBounded(100)
>> );
>> tEnv.connect(connectorDescriptor)
>> .withFormat(formatDescriptor)
>> .withSchema(schemaDescriptor)
>> .inAppendMode()
>> .registerTableSource("mysrc");
>> return tEnv.scan("mysrc");
>> }
>
>