You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Martijn Visser (Jira)" <ji...@apache.org> on 2022/03/30 13:45:00 UTC
[jira] [Closed] (FLINK-25227) Comparing the equality of the same (boxed) numeric values returns false
[ https://issues.apache.org/jira/browse/FLINK-25227?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-25227.
----------------------------------
Resolution: Fixed
> Comparing the equality of the same (boxed) numeric values returns false
> -----------------------------------------------------------------------
>
> Key: FLINK-25227
> URL: https://issues.apache.org/jira/browse/FLINK-25227
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Affects Versions: 1.14.0, 1.12.5, 1.13.3
> Reporter: Caizhi Weng
> Assignee: Marios Trivyzas
> Priority: Critical
> Labels: pull-request-available, stale-assigned
> Fix For: 1.15.0, 1.16.0, 1.13.7, 1.14.5
>
>
> Add the following test case to {{TableEnvironmentITCase}} to reproduce this bug.
> {code:scala}
> @Test
> def myTest(): Unit = {
> val data = Seq(
> Row.of(
> java.lang.Integer.valueOf(1000),
> java.lang.Integer.valueOf(2000),
> java.lang.Integer.valueOf(1000),
> java.lang.Integer.valueOf(2000))
> )
> tEnv.executeSql(
> s"""
> |create table T (
> | a int,
> | b int,
> | c int,
> | d int
> |) with (
> | 'connector' = 'values',
> | 'bounded' = 'true',
> | 'data-id' = '${TestValuesTableFactory.registerData(data)}'
> |)
> |""".stripMargin)
> tEnv.executeSql("select greatest(a, b) = greatest(c, d) from T").print()
> }
> {code}
> The result is false, which is obviously incorrect.
> This is caused by the generated java code:
> {code:java}
> public class StreamExecCalc$8 extends org.apache.flink.table.runtime.operators.TableStreamOperator
> implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> private final Object[] references;
> org.apache.flink.table.data.BoxedWrapperRowData out =
> new org.apache.flink.table.data.BoxedWrapperRowData(1);
> private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> public StreamExecCalc$8(
> Object[] references,
> org.apache.flink.streaming.runtime.tasks.StreamTask task,
> org.apache.flink.streaming.api.graph.StreamConfig config,
> org.apache.flink.streaming.api.operators.Output output,
> org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService)
> throws Exception {
> this.references = references;
> this.setup(task, config, output);
> if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
> ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
> .setProcessingTimeService(processingTimeService);
> }
> }
> @Override
> public void open() throws Exception {
> super.open();
> }
> @Override
> public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element)
> throws Exception {
> org.apache.flink.table.data.RowData in1 =
> (org.apache.flink.table.data.RowData) element.getValue();
> int field$0;
> boolean isNull$0;
> int field$1;
> boolean isNull$1;
> int field$3;
> boolean isNull$3;
> int field$4;
> boolean isNull$4;
> boolean isNull$6;
> boolean result$7;
> isNull$3 = in1.isNullAt(2);
> field$3 = -1;
> if (!isNull$3) {
> field$3 = in1.getInt(2);
> }
> isNull$0 = in1.isNullAt(0);
> field$0 = -1;
> if (!isNull$0) {
> field$0 = in1.getInt(0);
> }
> isNull$1 = in1.isNullAt(1);
> field$1 = -1;
> if (!isNull$1) {
> field$1 = in1.getInt(1);
> }
> isNull$4 = in1.isNullAt(3);
> field$4 = -1;
> if (!isNull$4) {
> field$4 = in1.getInt(3);
> }
> out.setRowKind(in1.getRowKind());
> java.lang.Integer result$2 = field$0;
> boolean nullTerm$2 = false;
> if (!nullTerm$2) {
> java.lang.Integer cur$2 = field$0;
> if (isNull$0) {
> nullTerm$2 = true;
> } else {
> int compareResult = result$2.compareTo(cur$2);
> if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
> result$2 = cur$2;
> }
> }
> }
> if (!nullTerm$2) {
> java.lang.Integer cur$2 = field$1;
> if (isNull$1) {
> nullTerm$2 = true;
> } else {
> int compareResult = result$2.compareTo(cur$2);
> if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
> result$2 = cur$2;
> }
> }
> }
> if (nullTerm$2) {
> result$2 = null;
> }
> java.lang.Integer result$5 = field$3;
> boolean nullTerm$5 = false;
> if (!nullTerm$5) {
> java.lang.Integer cur$5 = field$3;
> if (isNull$3) {
> nullTerm$5 = true;
> } else {
> int compareResult = result$5.compareTo(cur$5);
> if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
> result$5 = cur$5;
> }
> }
> }
> if (!nullTerm$5) {
> java.lang.Integer cur$5 = field$4;
> if (isNull$4) {
> nullTerm$5 = true;
> } else {
> int compareResult = result$5.compareTo(cur$5);
> if ((true && compareResult < 0) || (compareResult > 0 && !true)) {
> result$5 = cur$5;
> }
> }
> }
> if (nullTerm$5) {
> result$5 = null;
> }
> isNull$6 = nullTerm$2 || nullTerm$5;
> result$7 = false;
> if (!isNull$6) {
> result$7 = result$2 == result$5;
> }
> if (isNull$6) {
> out.setNullAt(0);
> } else {
> out.setBoolean(0, result$7);
> }
> output.collect(outElement.replace(out));
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> }
> {code}
> You can see that line 137 compares two boxed Integer types with {{==}} instead of {{.equals}}, which causes this problem.
> In older Flink versions where the return types of {{cast}} functions are also boxed types, casting strings to numeric values are also affected by this bug.
> Currently for a quick fix we can rewrite the generated code. But for a long term solution we shouldn't use boxed types as internal data structures.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)