You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/05 03:07:52 UTC

[flink] branch release-1.11 updated: [FLINK-16577][table-planner-blink] Fix numeric type mismatch error in  column interval relmetadata

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new a3a34b7  [FLINK-16577][table-planner-blink] Fix numeric type mismatch error in  column interval relmetadata
a3a34b7 is described below

commit a3a34b7d6b0e2f8c9ff4974c9947604fac8f8872
Author: godfrey he <go...@163.com>
AuthorDate: Fri Jun 5 10:19:27 2020 +0800

    [FLINK-16577][table-planner-blink] Fix numeric type mismatch error in  column interval relmetadata
    
    This closes #12326
---
 .../plan/metadata/FlinkRelMdColumnInterval.scala   |  48 ++--
 .../planner/plan/utils/ColumnIntervalUtil.scala    |  43 +---
 .../table/planner/plan/utils/FlinkRelOptUtil.scala |  60 +----
 .../planner/catalog/CatalogStatisticsTest.java     |  17 +-
 .../planner/plan/stream/sql/agg/AggregateTest.xml  |  21 ++
 .../metadata/FlinkRelMdColumnIntervalTest.scala    | 273 +++++++++++----------
 .../FlinkRelMdFilteredColumnIntervalTest.scala     | 113 +++++----
 .../plan/metadata/FlinkRelMdHandlerTestBase.scala  |   8 +
 .../plan/stream/sql/agg/AggregateTest.scala        |   6 +
 .../planner/utils/ColumnIntervalUtilTest.scala     |  26 --
 10 files changed, 301 insertions(+), 314 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index 7d4ab3f..762236e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -40,6 +40,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.{SqlBinaryOperator, SqlKind}
 import org.apache.calcite.util.Util
 
+import java.math.{BigDecimal => JBigDecimal}
+
 import scala.collection.JavaConversions._
 
 /**
@@ -75,9 +77,9 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
         (minValue == null && maxValue == null) || (max == null && min == null))
 
       if (minValue != null || maxValue != null) {
-        ValueInterval(minValue, maxValue)
+        ValueInterval(convertNumberToBigDecimal(minValue), convertNumberToBigDecimal(maxValue))
       } else if (max != null || min != null) {
-        ValueInterval(min, max)
+        ValueInterval(convertNumberToBigDecimal(min), convertNumberToBigDecimal(max))
       } else {
         null
       }
@@ -86,6 +88,22 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
     }
   }
 
+  private def convertNumberToBigDecimal(number: Number): Number = {
+    if (number != null) {
+      new JBigDecimal(number.toString)
+    } else {
+      number
+    }
+  }
+
+  private def convertNumberToBigDecimal(comparable: Comparable[_]): Comparable[_] = {
+    if (comparable != null && comparable.isInstanceOf[Number]) {
+      new JBigDecimal(comparable.toString)
+    } else {
+      comparable
+    }
+  }
+
   /**
     * Gets interval of the given column on Values.
     *
@@ -99,7 +117,9 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
     if (tuples.isEmpty) {
       EmptyValueInterval
     } else {
-      val values = tuples.map(t => FlinkRelOptUtil.getLiteralValue(t.get(index))).filter(_ != null)
+      val values = tuples.map {
+        t => FlinkRelOptUtil.getLiteralValueByBroadType(t.get(index))
+      }.filter(_ != null)
       if (values.isEmpty) {
         EmptyValueInterval
       } else {
@@ -135,7 +155,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
     projects.get(index) match {
       case inputRef: RexInputRef => fmq.getColumnInterval(project.getInput, inputRef.getIndex)
       case literal: RexLiteral =>
-        val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+        val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
         if (literalValue == null) {
           ValueInterval.empty
         } else {
@@ -211,7 +231,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
         }
 
       case literal: RexLiteral =>
-        val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+        val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
         if (literalValue == null) {
           ValueInterval.empty
         } else {
@@ -235,7 +255,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
         fmq.getColumnInterval(baseNode.getInput, inputRef.getIndex)
 
       case literal: RexLiteral =>
-        val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+        val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
         if (literalValue == null) {
           ValueInterval.empty
         } else {
@@ -310,7 +330,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
         case inputRef: RexInputRef =>
           Some(fmq.getColumnInterval(expand.getInput, inputRef.getIndex))
         case l: RexLiteral if l.getTypeName eq SqlTypeName.DECIMAL =>
-          val v = l.getValueAs(classOf[java.lang.Long])
+          val v = l.getValueAs(classOf[JBigDecimal])
           Some(ValueInterval(v, v))
         case l: RexLiteral if l.getValue == null =>
           None
@@ -342,17 +362,14 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
     val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
     if (index == rankFunColumnIndex) {
       rank.rankRange match {
-        case r: ConstantRankRange => ValueInterval(r.getRankStart, r.getRankEnd)
+        case r: ConstantRankRange =>
+          ValueInterval(JBigDecimal.valueOf(r.getRankStart), JBigDecimal.valueOf(r.getRankEnd))
         case v: VariableRankRange =>
           val interval = fmq.getColumnInterval(rank.getInput, v.getRankEndIndex)
           interval match {
             case hasUpper: WithUpper =>
-              val lower = ColumnIntervalUtil.convertStringToNumber("1", hasUpper.upper.getClass)
-              lower match {
-                case Some(l) =>
-                  ValueInterval(l, hasUpper.upper, includeUpper = hasUpper.includeUpper)
-                case _ => null
-              }
+              val lower = JBigDecimal.valueOf(1)
+              ValueInterval(lower, hasUpper.upper, includeUpper = hasUpper.includeUpper)
             case _ => null
           }
       }
@@ -646,7 +663,8 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
               } else {
                 null
               }
-            case COUNT => RightSemiInfiniteValueInterval(0, includeLower = true)
+            case COUNT =>
+              RightSemiInfiniteValueInterval(JBigDecimal.valueOf(0), includeLower = true)
             // TODO add more built-in agg functions
             case _ => null
           }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 3852f4f..482092c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.planner.plan.stats._
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.ColumnRelatedVisitor
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.getLiteralValueByBroadType
 
 import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, RexNode, RexUtil}
 import org.apache.calcite.sql.SqlKind
@@ -228,13 +229,13 @@ object ColumnIntervalUtil {
       val (literalValue, op) = (convertedCondition.operands.head, convertedCondition.operands.last)
       match {
         case (_: RexInputRef, literal: RexLiteral) =>
-          (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind)
+          (getLiteralValueByBroadType(literal), convertedCondition.getKind)
         case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
-          (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind)
+          (getLiteralValueByBroadType(literal), convertedCondition.getKind)
         case (literal: RexLiteral, _: RexInputRef) =>
-          (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind.reverse())
+          (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
         case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
-          (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind.reverse())
+          (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
         case _ => (null, null)
       }
       if (op == null || literalValue == null) {
@@ -295,38 +296,4 @@ object ColumnIntervalUtil {
     case _ => None
   }
 
-  def convertStringToNumber(number: String, clazz: Class[_]): Option[Comparable[_]] = {
-    if (clazz == classOf[java.lang.Byte]) {
-      Some(java.lang.Byte.valueOf(number))
-    } else if (clazz == classOf[java.lang.Short]) {
-      Some(java.lang.Short.valueOf(number))
-    } else if (clazz == classOf[java.lang.Integer]) {
-      Some(java.lang.Integer.valueOf(number))
-    } else if (clazz == classOf[java.lang.Float]) {
-      Some(java.lang.Float.valueOf(number))
-    } else if (clazz == classOf[java.lang.Long]) {
-      Some(java.lang.Long.valueOf(number))
-    } else if (clazz == classOf[java.lang.Double]) {
-      Some(java.lang.Double.valueOf(number))
-    } else if (clazz == classOf[java.math.BigDecimal]) {
-      Some(new java.math.BigDecimal(number))
-    } else if (clazz == classOf[java.math.BigInteger]) {
-      Some(new java.math.BigInteger(number))
-    } else if (clazz == classOf[scala.Byte]) {
-      Some(number.toByte)
-    } else if (clazz == classOf[scala.Short]) {
-      Some(number.toShort)
-    } else if (clazz == classOf[scala.Int]) {
-      Some(number.toInt)
-    } else if (clazz == classOf[scala.Long]) {
-      Some(number.toLong)
-    } else if (clazz == classOf[scala.Float]) {
-      Some(number.toFloat)
-    } else if (clazz == classOf[scala.Double]) {
-      Some(number.toDouble)
-    } else {
-      None
-    }
-  }
-
 }
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index 68f6598..8c2c6de 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -18,9 +18,9 @@
 package org.apache.flink.table.planner.plan.utils
 
 import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.planner.JBoolean
 import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory}
 import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
-import org.apache.flink.table.planner.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
 
 import org.apache.calcite.config.NullCollation
 import org.apache.calcite.plan.RelOptUtil
@@ -30,13 +30,11 @@ import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, Rex
 import org.apache.calcite.sql.SqlExplainLevel
 import org.apache.calcite.sql.SqlKind._
 import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.util.ImmutableBitSet
 import org.apache.commons.math3.util.ArithmeticUtils
 
 import java.io.{PrintWriter, StringWriter}
 import java.math.BigDecimal
 import java.sql.{Date, Time, Timestamp}
-import java.util
 import java.util.Calendar
 
 import scala.collection.JavaConversions._
@@ -207,25 +205,22 @@ object FlinkRelOptUtil {
   }
 
   /**
-    * Gets values of RexLiteral
+    * Gets values of [[RexLiteral]] by its broad type.
+   *
+   * <p> All number value (TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL)
+   * will be converted to BigDecimal
     *
     * @param literal input RexLiteral
-    * @return values of the input RexLiteral
+    * @return value of the input RexLiteral
     */
-  def getLiteralValue(literal: RexLiteral): Comparable[_] = {
+  def getLiteralValueByBroadType(literal: RexLiteral): Comparable[_] = {
     if (literal.isNull) {
       null
     } else {
-      val literalType = literal.getType
-      literalType.getSqlTypeName match {
+      literal.getTypeName match {
         case BOOLEAN => RexLiteral.booleanValue(literal)
-        case TINYINT => literal.getValueAs(classOf[JByte])
-        case SMALLINT => literal.getValueAs(classOf[JShort])
-        case INTEGER => literal.getValueAs(classOf[Integer])
-        case BIGINT => literal.getValueAs(classOf[JLong])
-        case FLOAT => literal.getValueAs(classOf[JFloat])
-        case DOUBLE => literal.getValueAs(classOf[JDouble])
-        case DECIMAL => literal.getValue3.asInstanceOf[BigDecimal]
+        case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL =>
+          literal.getValue3.asInstanceOf[BigDecimal]
         case VARCHAR | CHAR => literal.getValueAs(classOf[String])
 
         // temporal types
@@ -236,43 +231,12 @@ object FlinkRelOptUtil {
         case TIMESTAMP =>
           new Timestamp(literal.getValueAs(classOf[Calendar]).getTimeInMillis)
         case _ =>
-          throw new IllegalArgumentException(s"Literal type $literalType is not supported!")
+          throw new IllegalArgumentException(
+            s"Literal type ${literal.getTypeName} is not supported!")
       }
     }
   }
 
-  private def fix(operands: util.List[RexNode], before: Int, after: Int): Unit = {
-    if (before == after) {
-      return
-    }
-    operands.indices.foreach { i =>
-      val node = operands.get(i)
-      operands.set(i, RexUtil.shift(node, before, after - before))
-    }
-  }
-
-  /**
-    * Categorizes whether a bit set contains bits left and right of a line.
-    */
-  private object Side extends Enumeration {
-    type Side = Value
-    val LEFT, RIGHT, BOTH, EMPTY = Value
-
-    private[plan] def of(bitSet: ImmutableBitSet, middle: Int): Side = {
-      val firstBit = bitSet.nextSetBit(0)
-      if (firstBit < 0) {
-        return EMPTY
-      }
-      if (firstBit >= middle) {
-        return RIGHT
-      }
-      if (bitSet.nextSetBit(middle) < 0) {
-        return LEFT
-      }
-      BOTH
-    }
-  }
-
   /**
     * Partitions the [[RexNode]] in two [[RexNode]] according to a predicate.
     * The result is a pair of RexNode: the first RexNode consists of RexNode that satisfy the
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
index d7fa694..6f6d588 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
@@ -55,6 +55,7 @@ import org.apache.calcite.util.ImmutableBitSet;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -158,7 +159,9 @@ public class CatalogStatisticsTest {
 		// long type
 		assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0);
 		assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0);
-		assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0));
+		assertEquals(
+				ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+				mq.getColumnInterval(t1, 0));
 
 		// string type
 		assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0);
@@ -183,7 +186,9 @@ public class CatalogStatisticsTest {
 		// long type
 		assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0);
 		assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0);
-		assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0));
+		assertEquals(
+				ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+				mq.getColumnInterval(t1, 0));
 
 		// string type
 		assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0);
@@ -324,7 +329,9 @@ public class CatalogStatisticsTest {
 		// long type
 		assertEquals(23.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(1), null), 0.0);
 		assertEquals(77.0, mq.getColumnNullCount(rel, 1), 0.0);
-		assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(rel, 1));
+		assertEquals(
+				ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+				mq.getColumnInterval(rel, 1));
 
 		// string type
 		assertEquals(20.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(2), null), 0.0);
@@ -343,7 +350,9 @@ public class CatalogStatisticsTest {
 		// double type
 		assertEquals(73.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(4), null), 0.0);
 		assertEquals(27.0, mq.getColumnNullCount(rel, 4), 0.0);
-		assertEquals(ValueInterval$.MODULE$.apply(-123.35, 7633.22, true, true), mq.getColumnInterval(rel, 4));
+		assertEquals(
+				ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123.35), BigDecimal.valueOf(7633.22), true, true),
+				mq.getColumnInterval(rel, 4));
 	}
 
 	private void alterTableStatisticsWithUnknownRowCount(
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index b5a75b6..e4d3b2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -215,6 +215,27 @@ GroupAggregate(select=[AVG_RETRACT(a) AS EXPR$0], changelogMode=[I,UA,D])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testColumnIntervalValidation">
+    <Resource name="sql">
+      <![CDATA[SELECT b, SUM(a) FROM MyTable WHERE a > 0.1 and a < 10 GROUP BY b]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
++- LogicalProject(b=[$1], a=[$0])
+   +- LogicalFilter(condition=[AND(>($0, 0.1:DECIMAL(2, 1)), <($0, 10))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, proctime, rowtime)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+   +- Calc(select=[b, a], where=[AND(>(a, 0.1:DECIMAL(2, 1)), <(a, 10))])
+      +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, proctime, rowtime)]]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testGroupByWithConstantKey">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index fee3ce5..da04c77 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -44,11 +44,11 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnTableScan(): Unit = {
     Array(studentLogicalScan, studentFlinkLogicalScan, studentBatchScan, studentStreamScan)
       .foreach { scan =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(scan, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(scan, 0))
         assertNull(mq.getColumnInterval(scan, 1))
-        assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(scan, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(scan, 3))
-        assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(scan, 4))
+        assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(scan, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(scan, 3))
+        assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(scan, 4))
         assertNull(mq.getColumnInterval(scan, 5))
         assertNull(mq.getColumnInterval(scan, 6))
       }
@@ -66,7 +66,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
       assertEquals(ValueInterval.empty, mq.getColumnInterval(emptyValues, idx))
     }
 
-    assertEquals(ValueInterval(1L, 3L), mq.getColumnInterval(logicalValues, 0))
+    assertEquals(ValueInterval(bd(1L), bd(3L)), mq.getColumnInterval(logicalValues, 0))
     assertEquals(ValueInterval(false, true), mq.getColumnInterval(logicalValues, 1))
     assertEquals(ValueInterval(
       new Date(new DateString(2017, 9, 1).getMillisSinceEpoch),
@@ -80,7 +80,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
       new Timestamp(new TimestampString(2017, 7, 1, 1, 0, 0).getMillisSinceEpoch),
       new Timestamp(new TimestampString(2017, 10, 1, 1, 0, 0).getMillisSinceEpoch)),
       mq.getColumnInterval(logicalValues, 4))
-    assertEquals(ValueInterval(-1D, 3.12D), mq.getColumnInterval(logicalValues, 5))
+    assertEquals(ValueInterval(bd(-1D), bd(3.12D)), mq.getColumnInterval(logicalValues, 5))
     assertEquals(ValueInterval.empty, mq.getColumnInterval(logicalValues, 6))
     assertEquals(ValueInterval("F", "xyz"), mq.getColumnInterval(logicalValues, 7))
   }
@@ -94,17 +94,19 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
 
   @Test
   def testGetColumnIntervalOnProject(): Unit = {
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalProject, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(logicalProject, 0))
     assertNull(mq.getColumnInterval(logicalProject, 1))
-    assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(logicalProject, 2))
-    assertEqualsAsDouble(ValueInterval(11, 17), mq.getColumnInterval(logicalProject, 3))
-    assertEqualsAsDouble(ValueInterval(177.1, 189.31), mq.getColumnInterval(logicalProject, 4))
+    assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(logicalProject, 2))
+    assertEqualsAsDouble(ValueInterval(bd(11), bd(17)), mq.getColumnInterval(logicalProject, 3))
+    assertEqualsAsDouble(
+      ValueInterval(bd(177.1), bd(189.31)), mq.getColumnInterval(logicalProject, 4))
     assertNull(mq.getColumnInterval(logicalProject, 5))
-    assertEqualsAsDouble(ValueInterval(161.0D, 172.1), mq.getColumnInterval(logicalProject, 6))
-    assertEquals(ValueInterval(1, 2), mq.getColumnInterval(logicalProject, 7))
+    assertEqualsAsDouble(
+      ValueInterval(bd(161.0D), bd(172.1)), mq.getColumnInterval(logicalProject, 6))
+    assertEquals(ValueInterval(bd(1), bd(2)), mq.getColumnInterval(logicalProject, 7))
     assertEquals(ValueInterval(true, true), mq.getColumnInterval(logicalProject, 8))
-    assertEquals(ValueInterval(2.1D, 2.1D), mq.getColumnInterval(logicalProject, 9))
-    assertEquals(ValueInterval(2L, 2L), mq.getColumnInterval(logicalProject, 10))
+    assertEquals(ValueInterval(bd(2.1D), bd(2.1D)), mq.getColumnInterval(logicalProject, 9))
+    assertEquals(ValueInterval(bd(2L), bd(2L)), mq.getColumnInterval(logicalProject, 10))
     assertNull(mq.getColumnInterval(logicalProject, 11))
 
     // 3 * (score - 2)
@@ -128,8 +130,8 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     val expr0 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1))
     // id <= 20
     val expr1 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(0), relBuilder.literal(20))
-    // id > 10
-    val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(10))
+    // id > 10.0 (note: the types of id and literal are different)
+    val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(10.0))
     // DIV(id, 2) > 3
     val expr3 = relBuilder.call(GREATER_THAN,
       relBuilder.call(DIVIDE, relBuilder.field(0), relBuilder.literal(2)),
@@ -145,33 +147,36 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
 
     // id > -1
     val filter0 = relBuilder.push(ts).filter(expr0).build
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(filter0, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(filter0, 0))
 
     // id <= 20
     val filter1 = relBuilder.push(ts).filter(expr1).build
-    assertEquals(ValueInterval(0, 20), mq.getColumnInterval(filter1, 0))
+    assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(filter1, 0))
 
     // id <= 20 AND id > 10 AND DIV(id, 2) > 3
     val filter2 = relBuilder.push(ts).filter(expr1, expr2, expr3).build
-    assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(filter2, 0))
+    assertEquals(
+      ValueInterval(bd(10.0), bd(20), includeLower = false), mq.getColumnInterval(filter2, 0))
 
     // id <= 20 AND id > 10 AND score < 4.1
     val filter3 = relBuilder.push(ts).filter(expr1, expr2, expr4).build
-    assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(filter3, 0))
+    assertEquals(
+      ValueInterval(bd(10.0), bd(20), includeLower = false),
+      mq.getColumnInterval(filter3, 0))
 
     // score > 6.0 OR score <= 4.0
     val filter4 = relBuilder.push(ts).filter(relBuilder.call(OR, expr5, expr6)).build
-    assertEquals(ValueInterval(2.7, 4.0), mq.getColumnInterval(filter4, 2))
+    assertEquals(ValueInterval(bd(2.7), bd(4.0)), mq.getColumnInterval(filter4, 2))
 
     // score > 6.0 OR score <= 4.0 OR id < 20
     val filter5 = relBuilder.push(ts).filter(relBuilder.call(OR, expr5, expr6, expr1)).build
-    assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(filter5, 2))
+    assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(filter5, 2))
 
     // (id <= 20 AND score < 4.1) OR NOT(DIV(id, 2) > 3 OR score > 1.9)
     val filter6 = relBuilder.push(ts).filter(relBuilder.call(OR,
       relBuilder.call(AND, expr1, expr4),
       relBuilder.call(NOT, relBuilder.call(OR, expr3, expr7)))).build
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(filter6, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(filter6, 0))
 
     // (id <= 20 AND score < 4.1) OR NOT(id <= 20 OR score > 1.9)
     val filter7 = relBuilder.push(ts).filter(relBuilder.call(OR,
@@ -180,7 +185,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
         relBuilder.call(OR,
           RexUtil.negate(relBuilder.getRexBuilder, expr1.asInstanceOf[RexCall]),
           expr7)))).build
-    assertEquals(ValueInterval(0, 20), mq.getColumnInterval(filter7, 0))
+    assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(filter7, 0))
   }
 
   @Test
@@ -210,46 +215,48 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
 
     // calc => projects + filter(id <= 20)
     val calc1 = createLogicalCalc(studentLogicalScan, outputRowType, projects, List(expr1))
-    assertEquals(ValueInterval(0, 20), mq.getColumnInterval(calc1, 0))
+    assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(calc1, 0))
     assertNull(mq.getColumnInterval(calc1, 1))
-    assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc1, 2))
-    assertEqualsAsDouble(ValueInterval(11, 17), mq.getColumnInterval(calc1, 3))
-    assertEqualsAsDouble(ValueInterval(177.1, 189.31), mq.getColumnInterval(calc1, 4))
+    assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc1, 2))
+    assertEqualsAsDouble(ValueInterval(bd(11), bd(17)), mq.getColumnInterval(calc1, 3))
+    assertEqualsAsDouble(ValueInterval(bd(177.1), bd(189.31)), mq.getColumnInterval(calc1, 4))
     assertNull(mq.getColumnInterval(calc1, 5))
-    assertEqualsAsDouble(ValueInterval(161.0D, 172.1), mq.getColumnInterval(calc1, 6))
-    assertEquals(ValueInterval(1, 2), mq.getColumnInterval(calc1, 7))
+    assertEqualsAsDouble(ValueInterval(bd(161.0D), bd(172.1)), mq.getColumnInterval(calc1, 6))
+    assertEquals(ValueInterval(bd(1), bd(2)), mq.getColumnInterval(calc1, 7))
     assertEquals(ValueInterval(true, true), mq.getColumnInterval(calc1, 8))
-    assertEquals(ValueInterval(2.1D, 2.1D), mq.getColumnInterval(calc1, 9))
-    assertEquals(ValueInterval(2L, 2L), mq.getColumnInterval(calc1, 10))
+    assertEquals(ValueInterval(bd(2.1D), bd(2.1D)), mq.getColumnInterval(calc1, 9))
+    assertEquals(ValueInterval(bd(2L), bd(2L)), mq.getColumnInterval(calc1, 10))
     assertNull(mq.getColumnInterval(calc1, 11))
 
     // calc => project + filter(id <= 20 AND id > 10 AND DIV(id, 2) > 3)
     val calc2 = createLogicalCalc(
       studentLogicalScan, outputRowType, projects, List(expr1, expr2, expr3))
-    assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(calc2, 0))
+    assertEquals(
+      ValueInterval(bd(10), bd(20), includeLower = false), mq.getColumnInterval(calc2, 0))
     assertNull(mq.getColumnInterval(calc2, 1))
 
     // calc => project + filter(id <= 20 AND id > 10 AND score < 4.1)
     val calc3 = createLogicalCalc(
       studentLogicalScan, outputRowType, projects, List(expr1, expr2, expr4))
-    assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(calc3, 0))
+    assertEquals(
+      ValueInterval(bd(10), bd(20), includeLower = false), mq.getColumnInterval(calc3, 0))
 
     // calc => project + filter(score > 6.0 OR score <= 4.0)
     val calc4 = createLogicalCalc(
       studentLogicalScan, outputRowType, projects, List(relBuilder.call(OR, expr5, expr6)))
-    assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc4, 2))
+    assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc4, 2))
 
     // calc => project + filter(score > 6.0 OR score <= 4.0 OR id < 20)
     val calc5 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
       List(relBuilder.call(OR, expr5, expr6, expr1)))
-    assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc5, 2))
+    assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc5, 2))
 
     // calc => project + filter((id <= 20 AND score < 4.1) OR NOT(DIV(id, 2) > 3 OR score > 1.9))
     val calc6 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
       List(relBuilder.call(OR,
         relBuilder.call(AND, expr1, expr4),
         relBuilder.call(NOT, relBuilder.call(OR, expr3, expr7)))))
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(calc6, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(calc6, 0))
 
     // calc => project + filter: ($0 <=2 and $1 < 1.1) or not( $0>2 or $1 > 1.9)
     val calc7 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
@@ -259,7 +266,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
           relBuilder.call(OR,
             RexUtil.negate(relBuilder.getRexBuilder, expr1.asInstanceOf[RexCall]),
             expr7)))))
-    assertEquals(ValueInterval(0, 20), mq.getColumnInterval(calc7, 0))
+    assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(calc7, 0))
 
     relBuilder.push(studentLogicalScan)
     val expr8 = relBuilder.call(CASE, expr5, relBuilder.literal(1), relBuilder.literal(0))
@@ -274,24 +281,24 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     val calc8 = createLogicalCalc(
       studentLogicalScan, rowType, List(expr8, expr9, expr10, expr11), List())
 
-    assertEquals(ValueInterval(0, 1), mq.getColumnInterval(calc8, 0))
-    assertEquals(ValueInterval(10, 12), mq.getColumnInterval(calc8, 1))
-    assertEquals(ValueInterval(0, 12), mq.getColumnInterval(calc8, 2))
-    assertEquals(ValueInterval(1, 18), mq.getColumnInterval(calc8, 3))
+    assertEquals(ValueInterval(bd(0), bd(1)), mq.getColumnInterval(calc8, 0))
+    assertEquals(ValueInterval(bd(10), bd(12)), mq.getColumnInterval(calc8, 1))
+    assertEquals(ValueInterval(bd(0), bd(12)), mq.getColumnInterval(calc8, 2))
+    assertEquals(ValueInterval(bd(1), bd(18)), mq.getColumnInterval(calc8, 3))
   }
 
   @Test
   def testGetColumnIntervalOnExpand(): Unit = {
     Array(logicalExpand, flinkLogicalExpand, batchExpand, streamExpand).foreach {
       expand =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(expand, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(expand, 0))
         assertNull(mq.getColumnInterval(expand, 1))
-        assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(expand, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(expand, 3))
-        assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(expand, 4))
+        assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(expand, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(expand, 3))
+        assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(expand, 4))
         assertEquals(null, mq.getColumnInterval(expand, 5))
         assertEquals(null, mq.getColumnInterval(expand, 6))
-        assertEquals(ValueInterval(0, 5), mq.getColumnInterval(expand, 7))
+        assertEquals(ValueInterval(bd(0), bd(5)), mq.getColumnInterval(expand, 7))
     }
   }
 
@@ -302,11 +309,11 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
       logicalSortLimit, flinkLogicalSortLimit, batchSortLimit, batchLocalSortLimit,
       batchGlobalSortLimit, streamSortLimit).foreach {
       sort =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(sort, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(sort, 0))
         assertNull(mq.getColumnInterval(sort, 1))
-        assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(sort, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(sort, 3))
-        assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(sort, 4))
+        assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(sort, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(sort, 3))
+        assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(sort, 4))
         assertNull(mq.getColumnInterval(sort, 5))
         assertNull(mq.getColumnInterval(sort, 6))
     }
@@ -316,55 +323,53 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnRank(): Unit = {
     Array(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank).foreach {
       rank =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(rank, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
         assertNull(mq.getColumnInterval(rank, 1))
-        assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(rank, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(rank, 3))
-        assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(rank, 4))
+        assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+        assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
         assertNull(mq.getColumnInterval(rank, 5))
         assertNull(mq.getColumnInterval(rank, 6))
         rank match {
           case r: BatchExecRank if !r.isGlobal => // local batch rank does not output rank function
-          case _ => assertEquals(ValueInterval(1, 5), mq.getColumnInterval(rank, 7))
+          case _ => assertEquals(ValueInterval(bd(1), bd(5)), mq.getColumnInterval(rank, 7))
         }
     }
 
     Array(logicalRankWithVariableRange, flinkLogicalRankWithVariableRange,
       streamRankWithVariableRange).foreach {
       rank =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalRankWithVariableRange, 0))
-        assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 1))
-        assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval
-        (logicalRankWithVariableRange, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(logicalRankWithVariableRange, 3))
-        assertEquals(ValueInterval(161.0D, 172.1D),
-          mq.getColumnInterval(logicalRankWithVariableRange, 4))
-        assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 5))
-        assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 6))
-        assertEquals(ValueInterval(1, 18), mq.getColumnInterval(logicalRankWithVariableRange, 7))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
+        assertNull(mq.getColumnInterval(rank, 1))
+        assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+        assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
+        assertNull(mq.getColumnInterval(rank, 5))
+        assertNull(mq.getColumnInterval(rank, 6))
+        assertEquals(ValueInterval(bd(1), bd(18)), mq.getColumnInterval(rank, 7))
     }
 
     Array(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber).foreach {
       rank =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(rank, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
         assertNull(mq.getColumnInterval(rank, 1))
-        assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(rank, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(rank, 3))
-        assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(rank, 4))
+        assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+        assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
         assertNull(mq.getColumnInterval(rank, 5))
         assertNull(mq.getColumnInterval(rank, 6))
-        assertEquals(ValueInterval(3, 6), mq.getColumnInterval(rank, 7))
+        assertEquals(ValueInterval(bd(3), bd(6)), mq.getColumnInterval(rank, 7))
     }
   }
 
   @Test
   def testGetColumnIntervalOnExchange(): Unit = {
     val exchange = LogicalExchange.create(studentLogicalScan, RelDistributions.SINGLETON)
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(exchange, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(exchange, 0))
     assertNull(mq.getColumnInterval(exchange, 1))
-    assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(exchange, 2))
-    assertEquals(ValueInterval(12, 18), mq.getColumnInterval(exchange, 3))
-    assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(exchange, 4))
+    assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(exchange, 2))
+    assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(exchange, 3))
+    assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(exchange, 4))
     assertNull(mq.getColumnInterval(exchange, 5))
     assertNull(mq.getColumnInterval(exchange, 6))
   }
@@ -373,22 +378,22 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnAggregate(): Unit = {
     Array(logicalAgg, flinkLogicalAgg).foreach {
       agg =>
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
         assertNull(mq.getColumnInterval(agg, 1))
-        assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+        assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
         assertNull(mq.getColumnInterval(agg, 3))
         assertNull(mq.getColumnInterval(agg, 4))
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
     }
 
     Array(logicalAggWithAuxGroup, flinkLogicalAggWithAuxGroup).foreach {
       agg =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
         assertNull(mq.getColumnInterval(agg, 1))
-        assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(agg, 2))
+        assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(agg, 2))
         assertNull(mq.getColumnInterval(agg, 3))
-        assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 4))
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+        assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 4))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
     }
   }
 
@@ -396,40 +401,40 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnBatchExecAggregate(): Unit = {
     Array(batchGlobalAggWithLocal, batchGlobalAggWithoutLocal).foreach {
       agg =>
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
         assertNull(mq.getColumnInterval(agg, 1))
-        assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+        assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
         assertNull(mq.getColumnInterval(agg, 3))
         assertNull(mq.getColumnInterval(agg, 4))
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
     }
 
-    assertEquals(ValueInterval(12, 18), mq.getColumnInterval(batchLocalAgg, 0))
+    assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(batchLocalAgg, 0))
     assertNull(mq.getColumnInterval(batchLocalAgg, 1))
     assertNull(mq.getColumnInterval(batchLocalAgg, 2))
-    assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(batchLocalAgg, 3))
+    assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(batchLocalAgg, 3))
     assertNull(mq.getColumnInterval(batchLocalAgg, 4))
     assertNull(mq.getColumnInterval(batchLocalAgg, 5))
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAgg, 6))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAgg, 6))
 
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 0))
     assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 1))
-    assertEquals(ValueInterval(161.0, 172.1),
+    assertEquals(ValueInterval(bd(161.0), bd(172.1)),
       mq.getColumnInterval(batchLocalAggWithAuxGroup, 2))
     assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 3))
     assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 4))
-    assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 5))
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 6))
+    assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 5))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 6))
 
     Array(batchGlobalAggWithLocalWithAuxGroup, batchGlobalAggWithoutLocalWithAuxGroup)
       .foreach {
         agg =>
-          assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+          assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
           assertNull(mq.getColumnInterval(agg, 1))
-          assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(agg, 2))
+          assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(agg, 2))
           assertNull(mq.getColumnInterval(agg, 3))
-          assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 4))
-          assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+          assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 4))
+          assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
       }
   }
 
@@ -437,28 +442,28 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnStreamExecAggregate(): Unit = {
     Array(streamGlobalAggWithLocal, streamGlobalAggWithoutLocal).foreach {
       agg =>
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
         assertNull(mq.getColumnInterval(agg, 1))
-        assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+        assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
         assertNull(mq.getColumnInterval(agg, 3))
         assertNull(mq.getColumnInterval(agg, 4))
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
     }
 
-    assertEquals(ValueInterval(12, 18), mq.getColumnInterval(streamLocalAgg, 0))
+    assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(streamLocalAgg, 0))
     assertNull(mq.getColumnInterval(streamLocalAgg, 1))
     assertNull(mq.getColumnInterval(streamLocalAgg, 2))
-    assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(streamLocalAgg, 3))
+    assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(streamLocalAgg, 3))
     assertNull(mq.getColumnInterval(streamLocalAgg, 4))
     assertNull(mq.getColumnInterval(streamLocalAgg, 5))
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(streamLocalAgg, 6))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(streamLocalAgg, 6))
   }
 
   @Test
   def testGetColumnIntervalOnTableAggregate(): Unit = {
     Array(logicalTableAgg, flinkLogicalTableAgg, streamExecTableAgg).foreach {
       agg =>
-        assertEquals(RightSemiInfiniteValueInterval(0, true), mq.getColumnInterval(agg, 0))
+        assertEquals(RightSemiInfiniteValueInterval(bd(0), true), mq.getColumnInterval(agg, 0))
         assertNull(mq.getColumnInterval(agg, 1))
         assertNull(mq.getColumnInterval(agg, 2))
     }
@@ -467,7 +472,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   @Test
   def testGetColumnIntervalOnWindowTableAgg(): Unit = {
     Array(logicalWindowTableAgg, flinkLogicalWindowTableAgg, streamWindowTableAgg).foreach { agg =>
-      assertEquals(ValueInterval(5, 45), mq.getColumnInterval(agg, 0))
+      assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(agg, 0))
       assertEquals(null, mq.getColumnInterval(agg, 1))
       assertEquals(null, mq.getColumnInterval(agg, 2))
       assertEquals(null, mq.getColumnInterval(agg, 3))
@@ -481,29 +486,33 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnWindowAgg(): Unit = {
     Array(logicalWindowAgg, flinkLogicalWindowAgg, batchGlobalWindowAggWithLocalAgg,
       batchGlobalWindowAggWithoutLocalAgg, streamWindowAgg).foreach { agg =>
-      assertEquals(ValueInterval(5, 45), mq.getColumnInterval(agg, 0))
+      assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(agg, 0))
       assertEquals(null, mq.getColumnInterval(agg, 1))
-      assertEquals(RightSemiInfiniteValueInterval(0), mq.getColumnInterval(agg, 2))
+      assertEquals(RightSemiInfiniteValueInterval(bd(0)), mq.getColumnInterval(agg, 2))
       assertEquals(null, mq.getColumnInterval(agg, 3))
     }
-    assertEquals(ValueInterval(5, 45), mq.getColumnInterval(batchLocalWindowAgg, 0))
+    assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(batchLocalWindowAgg, 0))
     assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 1))
     assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 2))
-    assertEquals(RightSemiInfiniteValueInterval(0), mq.getColumnInterval(batchLocalWindowAgg, 3))
+    assertEquals(
+      RightSemiInfiniteValueInterval(bd(0)), mq.getColumnInterval(batchLocalWindowAgg, 3))
     assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 4))
 
     Array(logicalWindowAggWithAuxGroup, flinkLogicalWindowAggWithAuxGroup,
       batchGlobalWindowAggWithLocalAggWithAuxGroup,
       batchGlobalWindowAggWithoutLocalAggWithAuxGroup).foreach { agg =>
-      assertEquals(ValueInterval(5, 55), mq.getColumnInterval(agg, 0))
-      assertEquals(ValueInterval(0, 50), mq.getColumnInterval(agg, 1))
-      assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 2))
+      assertEquals(ValueInterval(bd(5), bd(55)), mq.getColumnInterval(agg, 0))
+      assertEquals(ValueInterval(bd(0), bd(50)), mq.getColumnInterval(agg, 1))
+      assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 2))
       assertEquals(null, mq.getColumnInterval(agg, 3))
     }
-    assertEquals(ValueInterval(5, 55), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 0))
+    assertEquals(
+      ValueInterval(bd(5), bd(55)), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 0))
     assertEquals(null, mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 1))
-    assertEquals(ValueInterval(0, 50), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 2))
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 3))
+    assertEquals(
+      ValueInterval(bd(0), bd(50)), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 2))
+    assertEquals(
+      ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 3))
     assertEquals(null, mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 4))
   }
 
@@ -511,10 +520,10 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnOverAgg(): Unit = {
     Array(flinkLogicalOverAgg, batchOverAgg).foreach {
       agg =>
-        assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+        assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
         assertEquals(null, mq.getColumnInterval(agg, 1))
-        assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(agg, 2))
-        assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 3))
+        assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(agg, 2))
+        assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 3))
         assertNull(mq.getColumnInterval(agg, 4))
         assertNull(mq.getColumnInterval(agg, 5))
         assertNull(mq.getColumnInterval(agg, 6))
@@ -524,10 +533,10 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
         assertNull(mq.getColumnInterval(agg, 10))
     }
 
-    assertEquals(ValueInterval(0, null), mq.getColumnInterval(streamOverAgg, 0))
+    assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(streamOverAgg, 0))
     assertEquals(null, mq.getColumnInterval(streamOverAgg, 1))
-    assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(streamOverAgg, 2))
-    assertEquals(ValueInterval(12, 18), mq.getColumnInterval(streamOverAgg, 3))
+    assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(streamOverAgg, 2))
+    assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(streamOverAgg, 3))
     assertNull(mq.getColumnInterval(streamOverAgg, 4))
     assertNull(mq.getColumnInterval(streamOverAgg, 5))
     assertNull(mq.getColumnInterval(streamOverAgg, 6))
@@ -546,31 +555,33 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
         rexBuilder.makeLiteral(1000L, longType, false))
     ).build
 
-    assertEquals(ValueInterval(100, null, includeLower = false), mq.getColumnInterval(join, 0))
-    assertEquals(ValueInterval(1L, 800000000L), mq.getColumnInterval(join, 1))
+    assertEquals(ValueInterval(bd(100), null, includeLower = false), mq.getColumnInterval(join, 0))
+    assertEquals(ValueInterval(bd(1L), bd(800000000L)), mq.getColumnInterval(join, 1))
     assertNull(mq.getColumnInterval(join, 2))
     assertNull(mq.getColumnInterval(join, 3))
-    assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(join, 4))
+    assertEquals(ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(join, 4))
     assertNull(mq.getColumnInterval(join, 5))
-    assertEquals(ValueInterval(8L, 1000L), mq.getColumnInterval(join, 6))
+    assertEquals(ValueInterval(bd(8L), bd(1000L)), mq.getColumnInterval(join, 6))
     assertNull(mq.getColumnInterval(join, 7))
     assertNull(mq.getColumnInterval(join, 8))
 
-    assertEquals(ValueInterval(0, null, includeLower = true),
+    assertEquals(ValueInterval(bd(0), null, includeLower = true),
       mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 0))
-    assertEquals(ValueInterval(1L, 800000000L),
+    assertEquals(ValueInterval(bd(1L), bd(800000000L)),
       mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 1))
     assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 2))
     assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 3))
-    assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
+    assertEquals(
+      ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
 
-    assertEquals(ValueInterval(0, null, includeLower = true),
+    assertEquals(ValueInterval(bd(0), null, includeLower = true),
       mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 0))
-    assertEquals(ValueInterval(1L, 800000000L),
+    assertEquals(ValueInterval(bd(1L), bd(800000000L)),
       mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 1))
     assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 2))
     assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 3))
-    assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
+    assertEquals(
+      ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
   }
 
   @Test
@@ -579,7 +590,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     val ts2 = relBuilder.scan("MyTable2").build()
     val union = relBuilder.push(ts1).push(ts2).union(true).build()
     assertNull(mq.getColumnInterval(union, 0))
-    assertEquals(ValueInterval(1L, 800000000L), mq.getColumnInterval(union, 1))
+    assertEquals(ValueInterval(bd(1L), bd(800000000L)), mq.getColumnInterval(union, 1))
     assertNull(mq.getColumnInterval(union, 2))
     assertNull(mq.getColumnInterval(union, 3))
   }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
index f8a0a32..c983900 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
@@ -17,12 +17,12 @@
  */
 package org.apache.flink.table.planner.plan.metadata
 
-import org.apache.flink.table.planner.plan.stats.{RightSemiInfiniteValueInterval,ValueInterval}
+import org.apache.flink.table.planner.plan.stats.{RightSemiInfiniteValueInterval, ValueInterval}
 import org.apache.flink.table.types.logical._
 
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, GREATER_THAN, IS_FALSE, IS_TRUE, LESS_THAN, LESS_THAN_OR_EQUAL, DIVIDE}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{DIVIDE, EQUALS, GREATER_THAN, IS_FALSE, IS_TRUE, LESS_THAN, LESS_THAN_OR_EQUAL}
 import org.junit.Assert.{assertEquals, assertNull}
 import org.junit.{Before, Test}
 
@@ -39,8 +39,8 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     relBuilder.push(ts)
     // a <= 2
     expr1 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(0), relBuilder.literal(2))
-    // a > -1
-    expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1))
+    // a > -1.0 (the types of `a` and literal are different)
+    expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1.0))
     // a / 2 > 3
     expr3 = relBuilder.call(GREATER_THAN,
       relBuilder.call(DIVIDE, relBuilder.field(0), relBuilder.literal(2)),
@@ -68,46 +68,51 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
       expr1, expr2, expr3, expr4, expr5, expr6, expr7, expr8, expr9)
   }
 
-
   @Test
   def testGetColumnIntervalOnProject(): Unit = {
     val p = relBuilder.project(projects: _*).build()
 
-    assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(p, 0, -1))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(p, 1, -1))
-    assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(p, 0, 2))
-    assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(p, 0, 3))
-    assertEquals(ValueInterval(-1, 5, includeLower = false), mq.getFilteredColumnInterval(p, 0, 5))
+    assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(p, 0, -1))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, -1))
+    assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(p, 0, 2))
+    assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(p, 0, 3))
+    assertEquals(ValueInterval(bd(-1.0), bd(5), includeLower = false),
+      mq.getFilteredColumnInterval(p, 0, 5))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(p, 1, 7))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(p, 1, 8))
-    assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(p, 0, 9))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false), mq.getFilteredColumnInterval(p, 1, 7))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, 8))
+    assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(p, 0, 9))
     assertEquals(
-      ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(p, 1, 10))
+      ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+      mq.getFilteredColumnInterval(p, 1, 10))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(p, 1, 11))
-    assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(p, 1, 12))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false), mq.getFilteredColumnInterval(p, 1, 11))
+    assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, 12))
   }
 
   @Test
   def testGetColumnIntervalOnFilter(): Unit = {
     val filter = relBuilder.project(projects: _*).filter(expr1).build()
 
-    assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(filter, 0, -1))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(filter, 1, -1))
-    assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(filter, 0, 2))
-    assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(filter, 0, 3))
+    assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(filter, 0, -1))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, -1))
+    assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(filter, 0, 2))
+    assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(filter, 0, 3))
     assertEquals(
-      ValueInterval(-1, 2, includeLower = false), mq.getFilteredColumnInterval(filter, 0, 5))
+      ValueInterval(bd(-1.0), bd(2), includeLower = false),
+      mq.getFilteredColumnInterval(filter, 0, 5))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(filter, 1, 7))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(filter, 1, 8))
-    assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(filter, 0, 9))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+      mq.getFilteredColumnInterval(filter, 1, 7))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, 8))
+    assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(filter, 0, 9))
     assertEquals(
-      ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(filter, 1, 10))
+      ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+      mq.getFilteredColumnInterval(filter, 1, 10))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(filter, 1, 11))
-    assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(filter, 1, 12))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+      mq.getFilteredColumnInterval(filter, 1, 11))
+    assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, 12))
   }
 
   @Test
@@ -119,43 +124,47 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
         new BooleanType(), new BooleanType(), new BooleanType(), new BooleanType(),
         new BooleanType()))
     val calc = createLogicalCalc(ts, outputRowType, projects, List(expr1))
-    assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(calc, 0, -1))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(calc, 1, -1))
-    assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(calc, 0, 2))
-    assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(calc, 0, 3))
+    assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(calc, 0, -1))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, -1))
+    assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(calc, 0, 2))
+    assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(calc, 0, 3))
     assertEquals(
-      ValueInterval(-1, 2, includeLower = false), mq.getFilteredColumnInterval(calc, 0, 5))
+      ValueInterval(bd(-1.0), bd(2), includeLower = false),
+      mq.getFilteredColumnInterval(calc, 0, 5))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(calc, 1, 7))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(calc, 1, 8))
-    assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(calc, 0, 9))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+      mq.getFilteredColumnInterval(calc, 1, 7))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, 8))
+    assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(calc, 0, 9))
     assertEquals(
-      ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(calc, 1, 10))
+      ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+      mq.getFilteredColumnInterval(calc, 1, 10))
     assertEquals(
-      ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(calc, 1, 11))
-    assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(calc, 1, 12))
+      ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+      mq.getFilteredColumnInterval(calc, 1, 11))
+    assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, 12))
   }
 
   @Test
   def testGetColumnIntervalOnAggregate(): Unit = {
     Array(logicalAgg, flinkLogicalAgg, batchGlobalAggWithoutLocal, batchGlobalAggWithLocal,
       streamGlobalAggWithoutLocal, streamGlobalAggWithLocal).foreach { agg =>
-      assertEquals(ValueInterval(12, 18), mq.getFilteredColumnInterval(agg, 0, -1))
+      assertEquals(ValueInterval(bd(12), bd(18)), mq.getFilteredColumnInterval(agg, 0, -1))
       assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
-      assertEquals(ValueInterval(2.7, null), mq.getFilteredColumnInterval(agg, 2, -1))
+      assertEquals(ValueInterval(bd(2.7), null), mq.getFilteredColumnInterval(agg, 2, -1))
     }
     Array(streamLocalAgg, batchLocalAgg).foreach { agg =>
-      assertEquals(ValueInterval(12, 18), mq.getFilteredColumnInterval(streamLocalAgg, 0, -1))
-      assertNull(mq.getFilteredColumnInterval(streamLocalAgg, 1, -1))
-      assertNull(mq.getFilteredColumnInterval(streamLocalAgg, 2, -1))
-      assertEquals(ValueInterval(2.7, null), mq.getFilteredColumnInterval(streamLocalAgg, 3, -1))
+      assertEquals(ValueInterval(bd(12), bd(18)), mq.getFilteredColumnInterval(agg, 0, -1))
+      assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
+      assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
+      assertEquals(ValueInterval(bd(2.7), null), mq.getFilteredColumnInterval(agg, 3, -1))
     }
 
     Array(logicalAggWithAuxGroup, flinkLogicalAggWithAuxGroup,
       batchGlobalAggWithoutLocalWithAuxGroup, batchGlobalAggWithLocalWithAuxGroup).foreach { agg =>
-      assertEquals(ValueInterval(0, null), mq.getFilteredColumnInterval(agg, 0, -1))
+      assertEquals(ValueInterval(bd(0), null), mq.getFilteredColumnInterval(agg, 0, -1))
       assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
-      assertEquals(ValueInterval(161.0, 172.1), mq.getFilteredColumnInterval(agg, 2, -1))
+      assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getFilteredColumnInterval(agg, 2, -1))
       assertNull(mq.getFilteredColumnInterval(agg, 3, -1))
     }
   }
@@ -165,7 +174,7 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     Array(logicalTableAgg, flinkLogicalTableAgg, streamExecTableAgg).foreach {
       agg =>
         assertEquals(
-          RightSemiInfiniteValueInterval(0, true),
+          RightSemiInfiniteValueInterval(bd(0), true),
           mq.getFilteredColumnInterval(agg, 0, -1))
         assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
         assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
@@ -176,7 +185,7 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
   def testGetColumnIntervalOnWindowTableAggregate(): Unit = {
     Array(logicalWindowTableAgg, flinkLogicalWindowTableAgg, streamWindowTableAgg).foreach {
       agg =>
-        assertEquals(ValueInterval(5, 45), mq.getFilteredColumnInterval(agg, 0, -1))
+        assertEquals(ValueInterval(bd(5), bd(45)), mq.getFilteredColumnInterval(agg, 0, -1))
         assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
         assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
         assertNull(mq.getFilteredColumnInterval(agg, 3, -1))
@@ -197,10 +206,10 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
     val filter1 = relBuilder.push(ts).project(projects: _*).filter(expr1).build()
     val filter2 = relBuilder.push(ts).project(projects: _*).filter(expr7).build()
     val union = relBuilder.push(filter1).push(filter2).union(true).build()
-    assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(union, 0, -1))
-    assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(union, 1, -1))
-    assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(union, 0, 2))
-    assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(union, 0, 3))
+    assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(union, 0, -1))
+    assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(union, 1, -1))
+    assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(union, 0, 2))
+    assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(union, 0, 3))
   }
 
   @Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 76188a5..0ce686c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -129,6 +129,14 @@ class FlinkRelMdHandlerTestBase {
     streamPhysicalTraits = cluster.traitSetOf(FlinkConventions.STREAM_PHYSICAL)
   }
 
+  protected def bd(value: Long): BigDecimal = {
+    BigDecimal.valueOf(value)
+  }
+
+  protected def bd(value: Double): BigDecimal = {
+    BigDecimal.valueOf(value)
+  }
+
   protected val intType: RelDataType = typeFactory.createFieldTypeFromLogicalType(
     new IntType(false))
 
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f01fad7..75c28ab 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -272,4 +272,10 @@ class AggregateTest extends TableTestBase {
       """.stripMargin
     util.verifyPlan(sql)
   }
+
+  @Test
+  def testColumnIntervalValidation(): Unit = {
+    // test for FLINK-16577
+    util.verifyPlan("SELECT b, SUM(a) FROM MyTable WHERE a > 0.1 and a < 10 GROUP BY b")
+  }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
index fd09f32..183634c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
@@ -189,30 +189,4 @@ class ColumnIntervalUtilTest {
     )
   }
 
-  @Test
-  def testConvertStringToNumber(): Unit = {
-    assertEqualsWithType(java.lang.Byte.valueOf("1"), "1")
-    assertEqualsWithType(java.lang.Short.valueOf("1"), "1")
-    assertEqualsWithType(java.lang.Integer.valueOf("1"), "1")
-    assertEqualsWithType(java.lang.Float.valueOf("1"), "1")
-    assertEqualsWithType(java.lang.Long.valueOf("1"), "1")
-    assertEqualsWithType(java.lang.Double.valueOf("1"), "1")
-    assertEqualsWithType(new java.math.BigDecimal("1"), "1")
-    assertEqualsWithType(new java.math.BigInteger("1"), "1")
-    assertEqualsWithType("1".toByte, "1")
-    assertEqualsWithType("1".toShort, "1")
-    assertEqualsWithType("1".toInt, "1")
-    assertEqualsWithType("1".toLong, "1")
-    assertEqualsWithType("1".toFloat, "1")
-    assertEqualsWithType("1".toDouble, "1")
-    assertEquals(None, convertStringToNumber("1", classOf[java.util.Date]))
-  }
-
-  private def assertEqualsWithType(number: Comparable[_], numberStr: String): Unit = {
-    val n = convertStringToNumber(numberStr, number.getClass)
-    assertTrue(n.isDefined)
-    assertTrue(number.getClass == n.get.getClass)
-    assertEquals(number, n.get)
-  }
-
 }