You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Mai (JIRA)" <ji...@apache.org> on 2019/06/19 08:59:00 UTC

[jira] [Updated] (FLINK-12898) FlinkSQL job fails when rowTime field encounters dirty data

     [ https://issues.apache.org/jira/browse/FLINK-12898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Mai updated FLINK-12898:
------------------------
    Description: 
I use FlinkSQL to process Kafka data in the following format:
  
|  id|  server_time|
|  1 |2019-05-15 10:00:00|
|  2 |2019-05-15 10:00:00|

.......
  
 and I define rowtime from the  server_time field:
 new Schema()
     .field("rowtime", Types.SQL_TIMESTAMP)
        .rowtime(new Rowtime().timestampsFromField("server_time"))
     .field("id", Types.String)
     .field("server_time", Types.String)
  
 when dirty data arrives, such as :
|  id   |  server_time|
|  99 |11.22.33.44 |

 
 My FlinkSQL job fails with exception:
{code:java}
java.lang.NumberFormatException: For input string: "11.22.33.44"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
at DataStreamSourceConversion$288.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
at java.lang.Thread.run(Thread.java:748){code}

  
 Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint, consumes dirty data again, fails again, and keeps looping like this.

  was:
I use FlinkSQL to process Kafka data in the following format:
 
|  id |  server_time |
|  1  | 2019-05-15 10:00:00 |
|  2  | 2019-05-15 10:00:00 |
.......
 
and I define rowtime from the  server_time field:
new Schema()
    .field("rowtime", Types.SQL_TIMESTAMP)
       .rowtime(new Rowtime().timestampsFromField("server_time"))
    .field("id", Types.String)
    .field("server_time", Types.String)
 
when dirty data arrives, such as :
|  id   |  server_time |
|  99  | 11.22.33.44  |
 
My FlinkSQL job fails with exception:
java.lang.NumberFormatException: For input string: "11.22.33.44"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
    at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
    at DataStreamSourceConversion$288.processElement(Unknown Source)
    at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
    at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
 
Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint, consumes dirty data again, fails again, and keeps looping like this.


> FlinkSQL job fails when rowTime field encounters dirty data
> -----------------------------------------------------------
>
>                 Key: FLINK-12898
>                 URL: https://issues.apache.org/jira/browse/FLINK-12898
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.7.2
>            Reporter: Mai
>            Priority: Minor
>
> I use FlinkSQL to process Kafka data in the following format:
>   
> |  id|  server_time|
> |  1 |2019-05-15 10:00:00|
> |  2 |2019-05-15 10:00:00|
> .......
>   
>  and I define rowtime from the  server_time field:
>  new Schema()
>      .field("rowtime", Types.SQL_TIMESTAMP)
>         .rowtime(new Rowtime().timestampsFromField("server_time"))
>      .field("id", Types.String)
>      .field("server_time", Types.String)
>   
>  when dirty data arrives, such as :
> |  id   |  server_time|
> |  99 |11.22.33.44 |
>  
>  My FlinkSQL job fails with exception:
> {code:java}
> java.lang.NumberFormatException: For input string: "11.22.33.44"
> at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
> at java.lang.Integer.parseInt(Integer.java:580)
> at java.lang.Integer.parseInt(Integer.java:615)
> at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625)
> at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715)
> at DataStreamSourceConversion$288.processElement(Unknown Source)
> at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
> at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
> at java.lang.Thread.run(Thread.java:748){code}
>   
>  Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint, consumes dirty data again, fails again, and keeps looping like this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)