You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by maidangdang <ma...@126.com> on 2019/05/15 14:49:49 UTC

FlinkSQL fails when rowtime meets dirty data

I use FlinkSQL to process Kafka data in the following format:
|  id |  server_time |
|  1  | 2019-05-15 10:00:00 |
|  2  | 2019-05-15 10:00:00 |
.......


and I define rowtime from the  server_time field:
new Schema()
    .field("rowtime", Types.SQL_TIMESTAMP)
       .rowtime(new Rowtime().timestampsFromField("server_time"))
    .field("id", Types.String)
    .field("server_time", Types.String)


when dirty data arrives, such as :
|  id   |  server_time |
|  99  | 11.22.33.44  |


My FlinkSQL job fails with exception:
java.lang.NumberFormatException: For input string: "11.22.33.44"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
at DataStreamSourceConversion$288.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748)


Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint, consumes dirty data again, fails again, and keeps looping like this.I would like to ask if there are any good ways to solve this situation?


The Flink version I used was flink-1.7.2

Re: FlinkSQL fails when rowtime meets dirty data

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I'm afraid I don't see another solution than touching the Flink code for
this and adding a try catch block around the timestamp conversion.

It would be great if you could create a Jira issue reporting this problem.
IMO, we should have a configuration switch (either per Table or query) to
either
* fail on dirty data
* "log and drop" dirty rows
* (send dirty data to a side output)

Best, Fabian

Am Mi., 15. Mai 2019 um 16:50 Uhr schrieb maidangdang <maidangdang44@126.com
>:

> I use FlinkSQL to process Kafka data in the following format:
> |  id |  server_time |
> |  1  | 2019-05-15 10:00:00 |
> |  2  | 2019-05-15 10:00:00 |
> .......
>
> and I define rowtime from the  server_time field:
> new Schema()
>     .field("rowtime", Types.SQL_TIMESTAMP)
>        .rowtime(new Rowtime().timestampsFromField("server_time"))
>     .field("id", Types.String)
>     .field("server_time", Types.String)
>
> when dirty data arrives, such as :
> |  id   |  server_time |
> |  99  | 11.22.33.44  |
>
> My FlinkSQL job fails with exception:
> java.lang.NumberFormatException: For input string: "11.22.33.44"
> at
> java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at
> org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
> at
> org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
> at DataStreamSourceConversion$288.processElement(Unknown Source)
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748)
>
> Because my flink job use EXACTLY_ONCE, so the job is re-executed from the
> last checkpoint, consumes dirty data again, fails again, and keeps looping
> like this.I would like to ask if there are any good ways to solve this
> situation?
>
> The Flink version I used was flink-1.7.2
>
>
>
>