You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by lz...@apache.org on 2022/02/14 02:46:46 UTC

[flink] branch release-1.14 updated: [FLINK-25227][table] Boxed numeric type should be considered when generating code for equality checking

This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 4b5e232  [FLINK-25227][table] Boxed numeric type should be considered when generating code for equality checking
4b5e232 is described below

commit 4b5e23236a4f46fe93bece00d37a9ed1d5a886ac
Author: tsreaper <ts...@gmail.com>
AuthorDate: Mon Feb 14 10:45:58 2022 +0800

    [FLINK-25227][table] Boxed numeric type should be considered when generating code for equality checking
    
    This closes #18291
---
 .../flink/table/planner/codegen/CodeGenUtils.scala |   7 +
 .../planner/codegen/calls/ScalarOperatorGens.scala |  12 +-
 .../planner/expressions/ScalarOperatorsTest.scala  | 268 ++++++++++++++++++++-
 .../table/runtime/functions/NumericUtils.java      |  46 ++++
 4 files changed, 330 insertions(+), 3 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
index 385938b..f851911 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGenUtils.scala
@@ -34,6 +34,7 @@ import org.apache.flink.table.data.utils.JoinedRowData
 import org.apache.flink.table.functions.UserDefinedFunction
 import org.apache.flink.table.planner.codegen.GenerateUtils.{generateInputFieldUnboxing, generateNonNullField}
 import org.apache.flink.table.runtime.dataview.StateDataViewStore
+import org.apache.flink.table.runtime.functions.NumericUtils
 import org.apache.flink.table.runtime.generated.{AggsHandleFunction, HashFunction, NamespaceAggsHandleFunction, TableAggsHandleFunction}
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isInteroperable
@@ -48,6 +49,10 @@ import org.apache.flink.table.types.logical.utils.LogicalTypeUtils.toInternalCon
 import org.apache.flink.table.types.utils.DataTypeUtils.isInternal
 import org.apache.flink.types.{Row, RowKind}
 
+import java.lang.reflect.Method
+import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong, Object => JObject, Short => JShort}
+import java.util.concurrent.atomic.AtomicLong
+
 import scala.annotation.tailrec
 
 object CodeGenUtils {
@@ -96,6 +101,8 @@ object CodeGenUtils {
 
   val ROW_KIND: String = className[RowKind]
 
+  val NUMERIC_UTIL: String = className[NumericUtils]
+
   val DECIMAL_UTIL: String = className[DecimalDataUtils]
 
   val SEGMENT: String = className[MemorySegment]
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 0c0673f..64aea62 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -607,7 +607,11 @@ object ScalarOperatorGens {
       }
       // both sides are numeric
       else if (isNumeric(left.resultType) && isNumeric(right.resultType)) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+        operator match {
+          case "==" => (leftTerm, rightTerm) => s"$NUMERIC_UTIL.equals($leftTerm, $rightTerm)"
+          case "!=" => (leftTerm, rightTerm) => s"!$NUMERIC_UTIL.equals($leftTerm, $rightTerm)"
+          case _ => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+        }
       }
 
       // both sides are timestamp
@@ -626,7 +630,11 @@ object ScalarOperatorGens {
       // both sides are temporal of same type
       else if (isTemporal(left.resultType) &&
           isInteroperable(left.resultType, right.resultType)) {
-        (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+        operator match {
+          case "==" => (leftTerm, rightTerm) => s"$NUMERIC_UTIL.equals($leftTerm, $rightTerm)"
+          case "!=" => (leftTerm, rightTerm) => s"!$NUMERIC_UTIL.equals($leftTerm, $rightTerm)"
+          case _ => (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm"
+        }
       }
       // both sides are boolean
       else if (isBoolean(left.resultType) &&
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
index 501f9bd..74d346a 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarOperatorsTest.scala
@@ -18,7 +18,10 @@
 
 package org.apache.flink.table.planner.expressions
 
-import org.apache.flink.table.planner.expressions.utils.ScalarOperatorsTestBase
+import org.apache.flink.table.api.DataTypes
+import org.apache.flink.table.planner.expressions.utils.{ExpressionTestBase, ScalarOperatorsTestBase}
+import org.apache.flink.table.types.AbstractDataType
+import org.apache.flink.types.Row
 
 import org.junit.Test
 
@@ -274,3 +277,266 @@ class ScalarOperatorsTest extends ScalarOperatorsTestBase {
     testSqlApi("uuid() = cast(f22 as timestamp_ltz)", "null")
   }
 }
+
+class ScalarEqualityOperatorsTest extends ExpressionTestBase {
+  // these tests are extracted into a specific test class because they need specific data
+
+  @Test
+  def testEqualityForNumericValues(): Unit = {
+    // direct equality
+    for (i <- 0 to 6) {
+      testSqlApi(s"f${i * 2} = f${i * 2 + 1}", "true")
+      testSqlApi(s"f${14 + i * 2} = f${14 + i * 2 + 1}", "true")
+      testSqlApi(s"f${i * 2} = f${14 + i * 2}", "false")
+
+      testSqlApi(s"f${i * 2} <> f${i * 2 + 1}", "false")
+      testSqlApi(s"f${14 + i * 2} <> f${14 + i * 2 + 1}", "false")
+      testSqlApi(s"f${i * 2} <> f${14 + i * 2}", "true")
+    }
+
+    // equality between primitive numeric types
+    for (i <- 0 to 6) {
+      for (j <- 0 to 6) {
+        testSqlApi(s"f${i * 2} = f${j * 2}", "true")
+        testSqlApi(s"f${i * 2} = f${14 + j * 2}", "false")
+
+        testSqlApi(s"f${i * 2} <> f${j * 2}", "false")
+        testSqlApi(s"f${i * 2} <> f${14 + j * 2}", "true")
+      }
+    }
+    // byte is excluded in this test because 777 overflows its range
+    for (i <- 1 to 6) {
+      for (j <- 1 to 6) {
+        testSqlApi(s"f${14 + i * 2} = f${14 + j * 2}", "true")
+        testSqlApi(s"f${14 + i * 2} = f${j * 2}", "false")
+
+        testSqlApi(s"f${14 + i * 2} <> f${14 + j * 2}", "false")
+        testSqlApi(s"f${14 + i * 2} <> f${j * 2}", "true")
+      }
+    }
+
+    // equality with boxed numeric types (LEAST will return boxed internal data type)
+    for (i <- 0 to 6) {
+      testSqlApi(s"LEAST(f${i * 2}) = LEAST(f${i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${i * 2}) = f${i * 2 + 1}", "true")
+      testSqlApi(s"LEAST(f${i * 2}) = f${14 + i * 2}", "false")
+      testSqlApi(s"f${i * 2} = LEAST(f${i * 2 + 1})", "true")
+      testSqlApi(s"f${14 + i * 2} = LEAST(f${i * 2 + 1})", "false")
+
+      testSqlApi(s"LEAST(f${i * 2}) <> LEAST(f${i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${i * 2}) <> f${i * 2 + 1}", "false")
+      testSqlApi(s"LEAST(f${i * 2}) <> f${14 + i * 2}", "true")
+      testSqlApi(s"f${i * 2} <> LEAST(f${i * 2 + 1})", "false")
+      testSqlApi(s"f${14 + i * 2} <> LEAST(f${i * 2 + 1})", "true")
+    }
+    // byte is excluded in this test because 777 overflows its range
+    for (i <- 1 to 6) {
+      testSqlApi(s"LEAST(f${14 + i * 2}) = LEAST(f${14 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${i * 2}) = LEAST(f${14 + i * 2})", "false")
+      testSqlApi(s"LEAST(f${14 + i * 2}) = LEAST(f${i * 2})", "false")
+      testSqlApi(s"LEAST(f${14 + i * 2}) = f${14 + i * 2 + 1}", "true")
+      testSqlApi(s"LEAST(f${14 + i * 2}) = f${i * 2 + 1}", "false")
+      testSqlApi(s"f${14 + i * 2} = LEAST(f${14 + i * 2 + 1})", "true")
+      testSqlApi(s"f${i * 2} = LEAST(f${14 + i * 2 + 1})", "false")
+
+      testSqlApi(s"LEAST(f${14 + i * 2}) <> LEAST(f${14 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${i * 2}) <> LEAST(f${14 + i * 2})", "true")
+      testSqlApi(s"LEAST(f${14 + i * 2}) <> LEAST(f${i * 2})", "true")
+      testSqlApi(s"LEAST(f${14 + i * 2}) <> f${14 + i * 2 + 1}", "false")
+      testSqlApi(s"LEAST(f${14 + i * 2}) <> f${i * 2 + 1}", "true")
+      testSqlApi(s"f${14 + i * 2} <> LEAST(f${14 + i * 2 + 1})", "false")
+      testSqlApi(s"f${i * 2} <> LEAST(f${14 + i * 2 + 1})", "true")
+    }
+  }
+
+  @Test
+  def testEqualityForTimeValues(): Unit = {
+    // direct equality
+    for (i <- 0 to 3) {
+      testSqlApi(s"f${28 + i * 2} = f${28 + i * 2 + 1}", "true")
+      testSqlApi(s"f${36 + i * 2} = f${36 + i * 2 + 1}", "true")
+      testSqlApi(s"f${28 + i * 2} = f${36 + i * 2}", "false")
+
+      testSqlApi(s"f${28 + i * 2} <> f${28 + i * 2 + 1}", "false")
+      testSqlApi(s"f${36 + i * 2} <> f${36 + i * 2 + 1}", "false")
+      testSqlApi(s"f${28 + i * 2} <> f${36 + i * 2}", "true")
+    }
+
+    // equality with boxed time types (LEAST will return boxed internal data type)
+    for (i <- 0 to 3) {
+      testSqlApi(s"LEAST(f${28 + i * 2}) = LEAST(f${28 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${36 + i * 2}) = LEAST(f${36 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${28 + i * 2}) = LEAST(f${36 + i * 2 + 1})", "false")
+
+      testSqlApi(s"LEAST(f${28 + i * 2}) = f${28 + i * 2 + 1}", "true")
+      testSqlApi(s"f${28 + i * 2} = LEAST(f${28 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${36 + i * 2}) = f${36 + i * 2 + 1}", "true")
+      testSqlApi(s"f${36 + i * 2} = LEAST(f${36 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${28 + i * 2}) = f${36 + i * 2 + 1}", "false")
+      testSqlApi(s"f${28 + i * 2} = LEAST(f${36 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${28 + i * 2}) = f${36 + i * 2 + 1}", "false")
+      testSqlApi(s"f${28 + i * 2} = LEAST(f${36 + i * 2 + 1})", "false")
+
+      testSqlApi(s"LEAST(f${28 + i * 2}) <> LEAST(f${28 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${36 + i * 2}) <> LEAST(f${36 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${28 + i * 2}) <> LEAST(f${36 + i * 2 + 1})", "true")
+
+      testSqlApi(s"LEAST(f${28 + i * 2}) <> f${28 + i * 2 + 1}", "false")
+      testSqlApi(s"f${28 + i * 2} <> LEAST(f${28 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${36 + i * 2}) <> f${36 + i * 2 + 1}", "false")
+      testSqlApi(s"f${36 + i * 2} <> LEAST(f${36 + i * 2 + 1})", "false")
+      testSqlApi(s"LEAST(f${28 + i * 2}) <> f${36 + i * 2 + 1}", "true")
+      testSqlApi(s"f${28 + i * 2} <> LEAST(f${36 + i * 2 + 1})", "true")
+      testSqlApi(s"LEAST(f${28 + i * 2}) <> f${36 + i * 2 + 1}", "true")
+      testSqlApi(s"f${28 + i * 2} <> LEAST(f${36 + i * 2 + 1})", "true")
+    }
+  }
+
+  @Test
+  def testEqualityForBooleanValues(): Unit = {
+    // direct equality
+    testSqlApi("f44 = f45", "true")
+    testSqlApi("f46 = f47", "true")
+    testSqlApi("f44 = f46", "false")
+    testSqlApi("f45 = f47", "false")
+
+    testSqlApi("f44 <> f45", "false")
+    testSqlApi("f46 <> f47", "false")
+    testSqlApi("f44 <> f46", "true")
+    testSqlApi("f45 <> f47", "true")
+
+    // equality with boxed boolean types (LEAST will return boxed internal data type)
+    testSqlApi("LEAST(f44) = LEAST(f45)", "true")
+    testSqlApi("LEAST(f46) = LEAST(f47)", "true")
+    testSqlApi("LEAST(f44) = LEAST(f47)", "false")
+    testSqlApi("LEAST(f46) = LEAST(f45)", "false")
+
+    testSqlApi("LEAST(f44) <> LEAST(f45)", "false")
+    testSqlApi("LEAST(f46) <> LEAST(f47)", "false")
+    testSqlApi("LEAST(f44) <> LEAST(f47)", "true")
+    testSqlApi("LEAST(f46) <> LEAST(f45)", "true")
+
+    testSqlApi("LEAST(f44) = f45", "true")
+    testSqlApi("f44 = LEAST(f45)", "true")
+    testSqlApi("LEAST(f46) = f47", "true")
+    testSqlApi("f46 = LEAST(f47)", "true")
+    testSqlApi("LEAST(f44) = f47", "false")
+    testSqlApi("f44 = LEAST(f47)", "false")
+    testSqlApi("LEAST(f46) = f45", "false")
+    testSqlApi("f46 = LEAST(f45)", "false")
+
+    testSqlApi("LEAST(f44) <> f45", "false")
+    testSqlApi("f44 <> LEAST(f45)", "false")
+    testSqlApi("LEAST(f46) <> f47", "false")
+    testSqlApi("f46 <> LEAST(f47)", "false")
+    testSqlApi("LEAST(f44) <> f47", "true")
+    testSqlApi("f44 <> LEAST(f47)", "true")
+    testSqlApi("LEAST(f46) <> f45", "true")
+    testSqlApi("f46 <> LEAST(f45)", "true")
+  }
+
+  override def testData: Row = {
+    Row.of(
+      // numeric values in range [-128, 127]
+      java.lang.Byte.valueOf("7"), java.lang.Byte.valueOf("7"),
+      java.lang.Short.valueOf("7"), java.lang.Short.valueOf("7"),
+      java.lang.Integer.valueOf(7), java.lang.Integer.valueOf(7),
+      java.lang.Long.valueOf(7), java.lang.Long.valueOf(7),
+      java.lang.Float.valueOf(7), java.lang.Float.valueOf(7),
+      java.lang.Double.valueOf(7), java.lang.Double.valueOf(7),
+      new java.math.BigDecimal("7"), new java.math.BigDecimal("7"),
+
+      // numeric values out of range [-128, 127], except for bytes
+      java.lang.Byte.valueOf("77"), java.lang.Byte.valueOf("77"),
+      java.lang.Short.valueOf("777"), java.lang.Short.valueOf("777"),
+      java.lang.Integer.valueOf(777), java.lang.Integer.valueOf(777),
+      java.lang.Long.valueOf(777), java.lang.Long.valueOf(777),
+      java.lang.Float.valueOf(777), java.lang.Float.valueOf(777),
+      java.lang.Double.valueOf(777), java.lang.Double.valueOf(777),
+      new java.math.BigDecimal("777"), new java.math.BigDecimal("777"),
+
+      // time values whose internal data representation in range [-128, 127]
+      java.time.LocalDate.ofEpochDay(7), java.time.LocalDate.ofEpochDay(7),
+      // currently Flink SQL does not support time with precision > 0,
+      // so the only integer second we can pick is 0
+      java.time.LocalTime.ofSecondOfDay(0), java.time.LocalTime.ofSecondOfDay(0),
+      java.time.LocalDateTime.ofEpochSecond(0, 7000000, java.time.ZoneOffset.UTC),
+      java.time.LocalDateTime.ofEpochSecond(0, 7000000, java.time.ZoneOffset.UTC),
+      java.time.Instant.ofEpochMilli(7), java.time.Instant.ofEpochMilli(7),
+
+      // time values whose internal data representation out of range [-128, 127]
+      java.time.LocalDate.ofEpochDay(7000), java.time.LocalDate.ofEpochDay(7000),
+      java.time.LocalTime.ofSecondOfDay(7), java.time.LocalTime.ofSecondOfDay(7),
+      java.time.LocalDateTime.ofEpochSecond(7, 0, java.time.ZoneOffset.UTC),
+      java.time.LocalDateTime.ofEpochSecond(7, 0, java.time.ZoneOffset.UTC),
+      java.time.Instant.ofEpochMilli(7000), java.time.Instant.ofEpochMilli(7000),
+
+      // boolean values
+      java.lang.Boolean.valueOf(true), java.lang.Boolean.valueOf(true),
+      java.lang.Boolean.valueOf(false), java.lang.Boolean.valueOf(false))
+  }
+
+  override def testDataType: AbstractDataType[_] = {
+    DataTypes.ROW(
+      // numeric values in range [-128, 127]
+      DataTypes.FIELD("f0", DataTypes.TINYINT()),
+      DataTypes.FIELD("f1", DataTypes.TINYINT()),
+      DataTypes.FIELD("f2", DataTypes.SMALLINT()),
+      DataTypes.FIELD("f3", DataTypes.SMALLINT()),
+      DataTypes.FIELD("f4", DataTypes.INT()),
+      DataTypes.FIELD("f5", DataTypes.INT()),
+      DataTypes.FIELD("f6", DataTypes.BIGINT()),
+      DataTypes.FIELD("f7", DataTypes.BIGINT()),
+      DataTypes.FIELD("f8", DataTypes.FLOAT()),
+      DataTypes.FIELD("f9", DataTypes.FLOAT()),
+      DataTypes.FIELD("f10", DataTypes.DOUBLE()),
+      DataTypes.FIELD("f11", DataTypes.DOUBLE()),
+      DataTypes.FIELD("f12", DataTypes.DECIMAL(10, 3)),
+      DataTypes.FIELD("f13", DataTypes.DECIMAL(10, 3)),
+
+      // numeric values out of range [-128, 127]
+      DataTypes.FIELD("f14", DataTypes.TINYINT()),
+      DataTypes.FIELD("f15", DataTypes.TINYINT()),
+      DataTypes.FIELD("f16", DataTypes.SMALLINT()),
+      DataTypes.FIELD("f17", DataTypes.SMALLINT()),
+      DataTypes.FIELD("f18", DataTypes.INT()),
+      DataTypes.FIELD("f19", DataTypes.INT()),
+      DataTypes.FIELD("f20", DataTypes.BIGINT()),
+      DataTypes.FIELD("f21", DataTypes.BIGINT()),
+      DataTypes.FIELD("f22", DataTypes.FLOAT()),
+      DataTypes.FIELD("f23", DataTypes.FLOAT()),
+      DataTypes.FIELD("f24", DataTypes.DOUBLE()),
+      DataTypes.FIELD("f25", DataTypes.DOUBLE()),
+      DataTypes.FIELD("f26", DataTypes.DECIMAL(10, 3)),
+      DataTypes.FIELD("f27", DataTypes.DECIMAL(10, 3)),
+
+      // time values whose internal data representation in range [-128, 127]
+      DataTypes.FIELD("f28", DataTypes.DATE()),
+      DataTypes.FIELD("f29", DataTypes.DATE()),
+      DataTypes.FIELD("f30", DataTypes.TIME(0)),
+      DataTypes.FIELD("f31", DataTypes.TIME(0)),
+      DataTypes.FIELD("f32", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f33", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f34", DataTypes.TIMESTAMP_LTZ(3)),
+      DataTypes.FIELD("f35", DataTypes.TIMESTAMP_LTZ(3)),
+
+      // time values whose internal data representation out of range [-128, 127]
+      DataTypes.FIELD("f36", DataTypes.DATE()),
+      DataTypes.FIELD("f37", DataTypes.DATE()),
+      DataTypes.FIELD("f38", DataTypes.TIME(0)),
+      DataTypes.FIELD("f39", DataTypes.TIME(0)),
+      DataTypes.FIELD("f40", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f41", DataTypes.TIMESTAMP(3)),
+      DataTypes.FIELD("f42", DataTypes.TIMESTAMP_LTZ(3)),
+      DataTypes.FIELD("f43", DataTypes.TIMESTAMP_LTZ(3)),
+
+      // boolean values
+      DataTypes.FIELD("f44", DataTypes.BOOLEAN()),
+      DataTypes.FIELD("f45", DataTypes.BOOLEAN()),
+      DataTypes.FIELD("f46", DataTypes.BOOLEAN()),
+      DataTypes.FIELD("f47", DataTypes.BOOLEAN())
+    )
+  }
+
+  override def containsLegacyTypes: Boolean = false
+}
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/NumericUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/NumericUtils.java
new file mode 100644
index 0000000..6ce3b991
--- /dev/null
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/NumericUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.	See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.	You may obtain a copy of the License at
+ *
+ *		http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions;
+
+/** Utility functions for numeric types: tinyint, smallint, int, long, float and double. */
+public class NumericUtils {
+
+    public static boolean equals(byte a, byte b) {
+        return a == b;
+    }
+
+    public static boolean equals(short a, short b) {
+        return a == b;
+    }
+
+    public static boolean equals(int a, int b) {
+        return a == b;
+    }
+
+    public static boolean equals(long a, long b) {
+        return a == b;
+    }
+
+    public static boolean equals(float a, float b) {
+        return a == b;
+    }
+
+    public static boolean equals(double a, double b) {
+        return a == b;
+    }
+}