You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Polarisary <po...@gmail.com> on 2019/11/01 07:49:50 UTC
Flink 1.9 Sql Rowtime Error
Hi All:
I have define kafka connector Descriptor, and registe Table
tEnv.connect(new Kafka()
.version("universal")
.topic(tableName)
.startFromEarliest()
.property("zookeeper.connect", “xxx")
.property("bootstrap.servers", “xxx")
.property("group.id", “xxx"))
.withFormat(new Json().deriveSchema())
.withSchema(new Schema()
.field("rowtime", Types.SQL_TIMESTAMP)
.rowtime(new Rowtime()
.timestampsFromField("createTime")
.watermarksPeriodicBounded(300_000))
.field("data", Types.ROW(dataFieldTypes)))
.inAppendMode().registerTableSource(tableName);
kafka input is:
{
"data": [
18140781,
],
"createTime": 1572577137596
}
Exception as follows:
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: java.time.format.DateTimeParseException: Text '1553080631582' could not be parsed at index 0
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
polarisary@gmail.com
Re: Flink 1.9 Sql Rowtime Error
Posted by OpenInx <op...@gmail.com>.
Hi Polarisary.
Checked the flink codebase and your stacktraces, seems you need to format
the timestamp as : "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"
The code is here:
https://github.com/apache/flink/blob/38e4e2b8f9bc63a793a2bddef5a578e3f80b7376/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonRowDeserializationSchema.java#L340
On Fri, Nov 1, 2019 at 3:50 PM Polarisary <po...@gmail.com> wrote:
> Hi All:
> I have define kafka connector Descriptor, and registe Table
>
> tEnv.connect(new Kafka()
> .version("universal")
> .topic(tableName)
> .startFromEarliest()
> .property("zookeeper.connect", *“*xxx")
> .property("bootstrap.servers", *“*xxx")
> .property("group.id", *“*xxx"))
> .withFormat(new Json().deriveSchema())
> .withSchema(new Schema()
>
> .field("rowtime", Types.SQL_TIMESTAMP)
> .rowtime(new Rowtime()
> .timestampsFromField("createTime")
> .watermarksPeriodicBounded(300_000))
> .field("data", Types.ROW(dataFieldTypes)))
>
> .inAppendMode().registerTableSource(tableName);
>
>
>
> kafka input is:
> {
>
> "data": [
> 18140781,
> ],
> "createTime": 1572577137596
> }
>
> Exception as follows:
> Caused by: java.io.IOException: Failed to deserialize JSON object.
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
> Caused by: java.time.format.DateTimeParseException: Text '1553080631582'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
> ... 7 more
>
> polarisary@gmail.com
>
>
>
>
>