You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2020/05/20 15:53:00 UTC

[jira] [Created] (FLINK-17847) ArrayIndexOutOfBoundsException happened in when codegen StreamExec operator

Leonard Xu created FLINK-17847:
----------------------------------

             Summary: ArrayIndexOutOfBoundsException happened in when codegen StreamExec operator
                 Key: FLINK-17847
                 URL: https://issues.apache.org/jira/browse/FLINK-17847
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.10.0, 1.11.0
            Reporter: Leonard Xu
             Fix For: 1.11.0, 1.10.0


user's query:

 
{code:java}
//source table 
create table json_table( 
w_es BIGINT, 
w_type STRING, 
w_isDdl BOOLEAN,
 w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>,
 w_ts TIMESTAMP(3), 
w_table STRING) WITH (
  'connector.type' = 'kafka',
  'connector.version' = '0.10',
  'connector.topic' = 'json-test2',
  'connector.properties.zookeeper.connect' = 'localhost:2181',
  'connector.properties.bootstrap.servers' = 'localhost:9092',
  'connector.properties.group.id' = 'test-jdbc',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'format.derive-schema' = 'true'
)
// real data:
{"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"}


//query
select w_ts, 'test' as city1_id,  w_data[0].pay_info AS cate3_id,
 w_data as pay_order_id from json_table

{code}
 

~exception:~

 

 
{code:java}
//
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848 at org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598) at org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590) at org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534) at org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) at StreamExecCalc$10.processElement(Unknown Source)
{code}
 

 

looks like in the codegen StreamExecCalc$10 operator some operation visit a '-1' index which should be wrong, this bug exits both in 1.10 and 1.11

 
{code:java}
public class StreamExecCalc$10 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
    implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {

    private final Object[] references;

    private final org.apache.flink.table.dataformat.BinaryString str$3 = org.apache.flink.table.dataformat.BinaryString.fromString("test");

    private transient org.apache.flink.table.runtime.typeutils.BaseArraySerializer typeSerializer$5;
    final org.apache.flink.table.dataformat.BoxedWrapperRow out = new org.apache.flink.table.dataformat.BoxedWrapperRow(4);
    private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

    public StreamExecCalc$10(
        Object[] references,
        org.apache.flink.streaming.runtime.tasks.StreamTask task,
        org.apache.flink.streaming.api.graph.StreamConfig config,
        org.apache.flink.streaming.api.operators.Output output) throws Exception {
        this.references = references;
        typeSerializer$5 = (((org.apache.flink.table.runtime.typeutils.BaseArraySerializer) references[0]));
        this.setup(task, config, output);
    }

    @Override
    public void open() throws Exception {
        super.open();

    }

    @Override
    public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
        org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) element.getValue();

        org.apache.flink.table.dataformat.SqlTimestamp field$2;
        boolean isNull$2;
        org.apache.flink.table.dataformat.BaseArray field$4;
        boolean isNull$4;
        org.apache.flink.table.dataformat.BaseArray field$6;
        org.apache.flink.table.dataformat.BinaryString field$8;
        boolean isNull$8;
        org.apache.flink.table.dataformat.BinaryString result$9;
        boolean isNull$9;

        isNull$2 = in1.isNullAt(4);
        field$2 = null;
        if (!isNull$2) {
            field$2 = in1.getTimestamp(4, 3);
        }

        isNull$4 = in1.isNullAt(3);
        field$4 = null;
        if (!isNull$4) {
            field$4 = in1.getArray(3);
        }
        field$6 = field$4;
        if (!isNull$4) {
            field$6 = (org.apache.flink.table.dataformat.BaseArray) (typeSerializer$5.copy(field$6));
        }
        out.setHeader(in1.getHeader());
        if (isNull$2) {
            out.setNullAt(0);
        } else {
            out.setNonPrimitiveValue(0, field$2);
        }

        if (false) {
            out.setNullAt(1);
        } else {
            out.setNonPrimitiveValue(1, ((org.apache.flink.table.dataformat.BinaryString) str$3));
        }

        boolean isNull$7 = isNull$4 || false || field$6.isNullAt(((int) 0) - 1);
        org.apache.flink.table.dataformat.BaseRow result$7 = isNull$7 ? null : field$6.getRow(((int) 0) - 1, 4);

        if (isNull$7) {
            result$9 = org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8;
            isNull$9 = true;
        }
        else {
            isNull$8 = result$7.isNullAt(0);
            field$8 = org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8;
            if (!isNull$8) {
                field$8 = result$7.getString(0);
            }
            result$9 = field$8;
            isNull$9 = isNull$8;
        }

        if (isNull$9) {
            out.setNullAt(2);
        } else {
            out.setNonPrimitiveValue(2, result$9);
        }

        if (isNull$4) {
            out.setNullAt(3);
        } else {
            out.setNonPrimitiveValue(3, field$6);
        }
        output.collect(outElement.replace(out));
   }

    @Override
    public void close() throws Exception {
        super.close();

    }
}
    
{code}
 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)