You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "MengYao (Jira)" <ji...@apache.org> on 2020/12/03 09:00:00 UTC

[jira] [Comment Edited] (FLINK-20463) flink-1.11.2 -sql cannot ignore exception record

    [ https://issues.apache.org/jira/browse/FLINK-20463?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17243021#comment-17243021 ] 

MengYao edited comment on FLINK-20463 at 12/3/20, 8:59 AM:
-----------------------------------------------------------

 

*I have a problem similar to yours,**I defined a Kafka dynamic table in SQL-Client. However, due to the incorrect format of some elements in the Kafka topic, an exception was thrown in SQL-Client. Can we add a configuration item to ignore these error records?*

version = 1.11.2

module = Table & SQL

My Setps:

         *{color:#00875a}// 1、enter the command line{color}*

        **        $FLINK_HOME/bin/sql-client.sh embedded

        *        **{color:#00875a}// 2、create kafka dynamic table{color}*

        *{color:#57d9a3}Flink SQL{color}>* CREATE TABLE kfk_test01 (
         **         order_id BIGINT,  -- 订单ID
         **         original_price DOUBLE,  -- 实付金额
         **         ctime BIGINT,  -- 创建时间
         **         ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd HH:mm:ss')),  -- 使用ctime字段值作为时间戳ts
         **         WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- 在ts字段上定义5秒延迟的水位线
         **        ) WITH (
         **         'connector' = 'kafka',
         **         'topic' = 'test01',
         **         'properties.bootstrap.servers' = 'node1:9092',
         **         'properties.group.id' = 'testGroup',
         **         'format' = 'json',
         **         'scan.startup.mode' = 'earliest-offset'
         *        *);**

        *{color:#00875a}// 3、execute query statement{color}*

        *        **{color:#57d9a3}Flink SQL>{color}* **select * from kfk_test01;

       *{color:#00875a}// 4、{color}*{color:#00875a}*Encountered a malformed element causing the query to fail(element context is: NULL or Empty)*{color}

*!QQ截图111.jpg!*

        *// 5、能否添加类似于MapReduce中可以跳过坏记录的通用配置项,它可以用于json和csv*

        *例如:{color:#de350b}skip.fail.records=0{color}(0、-1、>0)*

                   *{color:#de350b}0:The default value of 0 means that bad records are not allowed to be skipped,{color}*

                 *{color:#de350b}-1: means that all bad records can be skipped{color}*

                 *{color:#de350b}Any number> 0:indicates the maximum acceptable number of bad records{color}*


was (Author: mengyao):
 

*I have a problem similar to yours,**I defined a Kafka dynamic table in SQL-Client. However, due to the incorrect format of some elements in the Kafka topic, an exception was thrown in SQL-Client. Can we add a configuration item to ignore these error records?*

version = 1.11.2

module = Table & SQL

My Setps:

         *{color:#00875a}// 1、enter the command line{color}*

        **        $FLINK_HOME/bin/sql-client.sh embedded

        **        *{color:#00875a}// 2、create kafka dynamic table{color}*

        *{color:#57d9a3}Flink SQL{color}>* CREATE TABLE kfk_test01 (
        **         order_id BIGINT,  -- 订单ID
        **         original_price DOUBLE,  -- 实付金额
        **         ctime BIGINT,  -- 创建时间
        **         ts AS TO_TIMESTAMP(FROM_UNIXTIME(ctime, 'yyyy-MM-dd HH:mm:ss')),  -- 使用ctime字段值作为时间戳ts
        **         WATERMARK FOR ts AS ts - INTERVAL '5' SECOND  -- 在ts字段上定义5秒延迟的水位线
        **        ) WITH (
        **         'connector' = 'kafka',
        **         'topic' = 'test01',
        **         'properties.bootstrap.servers' = 'node1:9092',
        **         'properties.group.id' = 'testGroup',
        **         'format' = 'json',
        **         'scan.startup.mode' = 'earliest-offset'
        **        );**

        *{color:#00875a}// 3、execute query statement{color}*

        **        *{color:#57d9a3}Flink SQL>{color}* **select * from kfk_test01;

       *{color:#00875a}// 4、{color}*{color:#00875a}*Encountered a malformed element causing the query to fail(element context is: NULL or Empty)*{color}

*!QQ截图111.jpg!*

 

> flink-1.11.2 -sql cannot ignore exception record
> ------------------------------------------------
>
>                 Key: FLINK-20463
>                 URL: https://issues.apache.org/jira/browse/FLINK-20463
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.11.2
>         Environment: <flink.version>1.11.2</flink.version>
> <scala.binary.version>2.11</scala.binary.version>
>            Reporter: 谢波
>            Priority: Major
>         Attachments: QQ截图111.jpg, 无标题111.png
>
>
> can Flink SQL provide an option to ignore exception record?
> I have a table that maps kafka data in json format.
> When parsing the exception data, an exception is thrown, but the data is valid JSON, not a valid record.
> {color:#FF0000}exception data:{"SHEET":[""]}{color}
> {color:#FF0000}my table:{color}
> CREATE TABLE offline
> (
>  SHEET ROW (
>  HEADER MAP < STRING, STRING >,
>  ITEM ROW (
>  AMOUNT STRING,
>  COST STRING,
>  GOODSID STRING,
>  SALEVALUE STRING,
>  SAP_RTMATNR STRING,
>  SAP_RTPLU STRING,
>  SERIALID STRING,
>  SHEETID STRING
>  ) ARRAY,
>  ITEM5 MAP < STRING, STRING > ARRAY,
>  ITEM1 MAP < STRING, STRING > ARRAY,
>  TENDER MAP < STRING, STRING > ARRAY
>  ) ARRAY
> )
> WITH (
>  'connector' = 'kafka',
>  'properties.bootstrap.servers' = 'xxx:9092',
>  'properties.group.id' = 'realtime.sales.offline.group',
>  'topic' = 'bms133',
>  'format' = 'json',
>  {color:#FF0000}'json.ignore-parse-errors' = 'true',{color}
>  'scan.startup.mode' = 'earliest-offset'
> );
> {color:#FF0000}exception:{color}
> Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:116) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:129) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:51) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)