You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/03/30 13:45:00 UTC

[jira] [Closed] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false

     [ https://issues.apache.org/jira/browse/FLINK-25227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Martijn Visser closed FLINK-25227.
----------------------------------
    Resolution: Fixed

> Comparing the equality of the same (boxed) numeric values returns false
> -----------------------------------------------------------------------
>
>                 Key: FLINK-25227
>                 URL: https://issues.apache.org/jira/browse/FLINK-25227
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>    Affects Versions: 1.14.0, 1.12.5, 1.13.3
>            Reporter: Caizhi Weng
>            Assignee: Marios Trivyzas
>            Priority: Critical
>              Labels: pull-request-available, stale-assigned
>             Fix For: 1.15.0, 1.16.0, 1.13.7, 1.14.5
>
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
>   val data = Seq(
>     Row.of(
>       java.lang.Integer.valueOf(1000),
>       java.lang.Integer.valueOf(2000),
>       java.lang.Integer.valueOf(1000),
>       java.lang.Integer.valueOf(2000))
>   )
>   tEnv.executeSql(
>     s"""
>        |create table T (
>        |  a int,
>        |  b int,
>        |  c int,
>        |  d int
>        |) with (
>        |  'connector' = 'values',
>        |  'bounded' = 'true',
>        |  'data-id' = '${TestValuesTableFactory.registerData(data)}'
>        |)
>        |""".stripMargin)
>   tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
> }
> {code}
> The result is false, which is obviously incorrect.
> This is caused by the generated java code:
> {code:java}
> public class StreamExecCalc$8 extends org.apache.flink.table.runtime.operators.TableStreamOperator
>         implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
>     private final Object[] references;
>     org.apache.flink.table.data.BoxedWrapperRowData out =
>             new org.apache.flink.table.data.BoxedWrapperRowData(1);
>     private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
>             new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
>     public StreamExecCalc$8(
>             Object[] references,
>             org.apache.flink.streaming.runtime.tasks.StreamTask task,
>             org.apache.flink.streaming.api.graph.StreamConfig config,
>             org.apache.flink.streaming.api.operators.Output output,
>             org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService)
>             throws Exception {
>         this.references = references;
>         this.setup(task, config, output);
>         if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
>             ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
>                     .setProcessingTimeService(processingTimeService);
>         }
>     }
>     @Override
>     public void open() throws Exception {
>         super.open();
>     }
>     @Override
>     public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element)
>             throws Exception {
>         org.apache.flink.table.data.RowData in1 =
>                 (org.apache.flink.table.data.RowData) element.getValue();
>         int field$0;
>         boolean isNull$0;
>         int field$1;
>         boolean isNull$1;
>         int field$3;
>         boolean isNull$3;
>         int field$4;
>         boolean isNull$4;
>         boolean isNull$6;
>         boolean result$7;
>         isNull$3 = in1.isNullAt(2);
>         field$3 = -1;
>         if (!isNull$3) {
>             field$3 = in1.getInt(2);
>         }
>         isNull$0 = in1.isNullAt(0);
>         field$0 = -1;
>         if (!isNull$0) {
>             field$0 = in1.getInt(0);
>         }
>         isNull$1 = in1.isNullAt(1);
>         field$1 = -1;
>         if (!isNull$1) {
>             field$1 = in1.getInt(1);
>         }
>         isNull$4 = in1.isNullAt(3);
>         field$4 = -1;
>         if (!isNull$4) {
>             field$4 = in1.getInt(3);
>         }
>         out.setRowKind(in1.getRowKind());
>         java.lang.Integer result$2 = field$0;
>         boolean nullTerm$2 = false;
>         if (!nullTerm$2) {
>             java.lang.Integer cur$2 = field$0;
>             if (isNull$0) {
>                 nullTerm$2 = true;
>             } else {
>                 int compareResult = result$2.compareTo(cur$2);
>                 if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
>                     result$2 = cur$2;
>                 }
>             }
>         }
>         if (!nullTerm$2) {
>             java.lang.Integer cur$2 = field$1;
>             if (isNull$1) {
>                 nullTerm$2 = true;
>             } else {
>                 int compareResult = result$2.compareTo(cur$2);
>                 if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
>                     result$2 = cur$2;
>                 }
>             }
>         }
>         if (nullTerm$2) {
>             result$2 = null;
>         }
>         java.lang.Integer result$5 = field$3;
>         boolean nullTerm$5 = false;
>         if (!nullTerm$5) {
>             java.lang.Integer cur$5 = field$3;
>             if (isNull$3) {
>                 nullTerm$5 = true;
>             } else {
>                 int compareResult = result$5.compareTo(cur$5);
>                 if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
>                     result$5 = cur$5;
>                 }
>             }
>         }
>         if (!nullTerm$5) {
>             java.lang.Integer cur$5 = field$4;
>             if (isNull$4) {
>                 nullTerm$5 = true;
>             } else {
>                 int compareResult = result$5.compareTo(cur$5);
>                 if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
>                     result$5 = cur$5;
>                 }
>             }
>         }
>         if (nullTerm$5) {
>             result$5 = null;
>         }
>         isNull$6 = nullTerm$2 || nullTerm$5;
>         result$7 = false;
>         if (!isNull$6) {
>             result$7 = result$2 == result$5;
>         }
>         if (isNull$6) {
>             out.setNullAt(0);
>         } else {
>             out.setBoolean(0, result$7);
>         }
>         output.collect(outElement.replace(out));
>     }
>     @Override
>     public void close() throws Exception {
>         super.close();
>     }
> }
> {code}
> You can see that line 137 compares two boxed Integer types with {{==}} instead of {{.equals}}, which causes this problem.
> In older Flink versions where the return types of {{cast}} functions are also boxed types, casting strings to numeric values are also affected by this bug.
> Currently for a quick fix we can rewrite the generated code. But for a long term solution we shouldn't use boxed types as internal data structures.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)