You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/06/05 03:07:52 UTC
[flink] branch release-1.11 updated: [FLINK-16577][table-planner-blink] Fix numeric type mismatch error in column interval relmetadata
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new a3a34b7 [FLINK-16577][table-planner-blink] Fix numeric type mismatch error in column interval relmetadata
a3a34b7 is described below
commit a3a34b7d6b0e2f8c9ff4974c9947604fac8f8872
Author: godfrey he <go...@163.com>
AuthorDate: Fri Jun 5 10:19:27 2020 +0800
[FLINK-16577][table-planner-blink] Fix numeric type mismatch error in column interval relmetadata
This closes #12326
---
.../plan/metadata/FlinkRelMdColumnInterval.scala | 48 ++--
.../planner/plan/utils/ColumnIntervalUtil.scala | 43 +---
.../table/planner/plan/utils/FlinkRelOptUtil.scala | 60 +----
.../planner/catalog/CatalogStatisticsTest.java | 17 +-
.../planner/plan/stream/sql/agg/AggregateTest.xml | 21 ++
.../metadata/FlinkRelMdColumnIntervalTest.scala | 273 +++++++++++----------
.../FlinkRelMdFilteredColumnIntervalTest.scala | 113 +++++----
.../plan/metadata/FlinkRelMdHandlerTestBase.scala | 8 +
.../plan/stream/sql/agg/AggregateTest.scala | 6 +
.../planner/utils/ColumnIntervalUtilTest.scala | 26 --
10 files changed, 301 insertions(+), 314 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
index 7d4ab3f..762236e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnInterval.scala
@@ -40,6 +40,8 @@ import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.{SqlBinaryOperator, SqlKind}
import org.apache.calcite.util.Util
+import java.math.{BigDecimal => JBigDecimal}
+
import scala.collection.JavaConversions._
/**
@@ -75,9 +77,9 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
(minValue == null && maxValue == null) || (max == null && min == null))
if (minValue != null || maxValue != null) {
- ValueInterval(minValue, maxValue)
+ ValueInterval(convertNumberToBigDecimal(minValue), convertNumberToBigDecimal(maxValue))
} else if (max != null || min != null) {
- ValueInterval(min, max)
+ ValueInterval(convertNumberToBigDecimal(min), convertNumberToBigDecimal(max))
} else {
null
}
@@ -86,6 +88,22 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
}
}
+ private def convertNumberToBigDecimal(number: Number): Number = {
+ if (number != null) {
+ new JBigDecimal(number.toString)
+ } else {
+ number
+ }
+ }
+
+ private def convertNumberToBigDecimal(comparable: Comparable[_]): Comparable[_] = {
+ if (comparable != null && comparable.isInstanceOf[Number]) {
+ new JBigDecimal(comparable.toString)
+ } else {
+ comparable
+ }
+ }
+
/**
* Gets interval of the given column on Values.
*
@@ -99,7 +117,9 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
if (tuples.isEmpty) {
EmptyValueInterval
} else {
- val values = tuples.map(t => FlinkRelOptUtil.getLiteralValue(t.get(index))).filter(_ != null)
+ val values = tuples.map {
+ t => FlinkRelOptUtil.getLiteralValueByBroadType(t.get(index))
+ }.filter(_ != null)
if (values.isEmpty) {
EmptyValueInterval
} else {
@@ -135,7 +155,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
projects.get(index) match {
case inputRef: RexInputRef => fmq.getColumnInterval(project.getInput, inputRef.getIndex)
case literal: RexLiteral =>
- val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+ val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
if (literalValue == null) {
ValueInterval.empty
} else {
@@ -211,7 +231,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
}
case literal: RexLiteral =>
- val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+ val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
if (literalValue == null) {
ValueInterval.empty
} else {
@@ -235,7 +255,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
fmq.getColumnInterval(baseNode.getInput, inputRef.getIndex)
case literal: RexLiteral =>
- val literalValue = FlinkRelOptUtil.getLiteralValue(literal)
+ val literalValue = FlinkRelOptUtil.getLiteralValueByBroadType(literal)
if (literalValue == null) {
ValueInterval.empty
} else {
@@ -310,7 +330,7 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
case inputRef: RexInputRef =>
Some(fmq.getColumnInterval(expand.getInput, inputRef.getIndex))
case l: RexLiteral if l.getTypeName eq SqlTypeName.DECIMAL =>
- val v = l.getValueAs(classOf[java.lang.Long])
+ val v = l.getValueAs(classOf[JBigDecimal])
Some(ValueInterval(v, v))
case l: RexLiteral if l.getValue == null =>
None
@@ -342,17 +362,14 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
val rankFunColumnIndex = RankUtil.getRankNumberColumnIndex(rank).getOrElse(-1)
if (index == rankFunColumnIndex) {
rank.rankRange match {
- case r: ConstantRankRange => ValueInterval(r.getRankStart, r.getRankEnd)
+ case r: ConstantRankRange =>
+ ValueInterval(JBigDecimal.valueOf(r.getRankStart), JBigDecimal.valueOf(r.getRankEnd))
case v: VariableRankRange =>
val interval = fmq.getColumnInterval(rank.getInput, v.getRankEndIndex)
interval match {
case hasUpper: WithUpper =>
- val lower = ColumnIntervalUtil.convertStringToNumber("1", hasUpper.upper.getClass)
- lower match {
- case Some(l) =>
- ValueInterval(l, hasUpper.upper, includeUpper = hasUpper.includeUpper)
- case _ => null
- }
+ val lower = JBigDecimal.valueOf(1)
+ ValueInterval(lower, hasUpper.upper, includeUpper = hasUpper.includeUpper)
case _ => null
}
}
@@ -646,7 +663,8 @@ class FlinkRelMdColumnInterval private extends MetadataHandler[ColumnInterval] {
} else {
null
}
- case COUNT => RightSemiInfiniteValueInterval(0, includeLower = true)
+ case COUNT =>
+ RightSemiInfiniteValueInterval(JBigDecimal.valueOf(0), includeLower = true)
// TODO add more built-in agg functions
case _ => null
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 3852f4f..482092c 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -20,6 +20,7 @@ package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.planner.plan.stats._
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.ColumnRelatedVisitor
+import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil.getLiteralValueByBroadType
import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, RexNode, RexUtil}
import org.apache.calcite.sql.SqlKind
@@ -228,13 +229,13 @@ object ColumnIntervalUtil {
val (literalValue, op) = (convertedCondition.operands.head, convertedCondition.operands.last)
match {
case (_: RexInputRef, literal: RexLiteral) =>
- (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind)
+ (getLiteralValueByBroadType(literal), convertedCondition.getKind)
case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
- (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind)
+ (getLiteralValueByBroadType(literal), convertedCondition.getKind)
case (literal: RexLiteral, _: RexInputRef) =>
- (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind.reverse())
+ (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
- (FlinkRelOptUtil.getLiteralValue(literal), convertedCondition.getKind.reverse())
+ (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
case _ => (null, null)
}
if (op == null || literalValue == null) {
@@ -295,38 +296,4 @@ object ColumnIntervalUtil {
case _ => None
}
- def convertStringToNumber(number: String, clazz: Class[_]): Option[Comparable[_]] = {
- if (clazz == classOf[java.lang.Byte]) {
- Some(java.lang.Byte.valueOf(number))
- } else if (clazz == classOf[java.lang.Short]) {
- Some(java.lang.Short.valueOf(number))
- } else if (clazz == classOf[java.lang.Integer]) {
- Some(java.lang.Integer.valueOf(number))
- } else if (clazz == classOf[java.lang.Float]) {
- Some(java.lang.Float.valueOf(number))
- } else if (clazz == classOf[java.lang.Long]) {
- Some(java.lang.Long.valueOf(number))
- } else if (clazz == classOf[java.lang.Double]) {
- Some(java.lang.Double.valueOf(number))
- } else if (clazz == classOf[java.math.BigDecimal]) {
- Some(new java.math.BigDecimal(number))
- } else if (clazz == classOf[java.math.BigInteger]) {
- Some(new java.math.BigInteger(number))
- } else if (clazz == classOf[scala.Byte]) {
- Some(number.toByte)
- } else if (clazz == classOf[scala.Short]) {
- Some(number.toShort)
- } else if (clazz == classOf[scala.Int]) {
- Some(number.toInt)
- } else if (clazz == classOf[scala.Long]) {
- Some(number.toLong)
- } else if (clazz == classOf[scala.Float]) {
- Some(number.toFloat)
- } else if (clazz == classOf[scala.Double]) {
- Some(number.toDouble)
- } else {
- None
- }
- }
-
}
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
index 68f6598..8c2c6de 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelOptUtil.scala
@@ -18,9 +18,9 @@
package org.apache.flink.table.planner.plan.utils
import org.apache.flink.table.api.TableConfig
+import org.apache.flink.table.planner.JBoolean
import org.apache.flink.table.planner.calcite.{FlinkContext, FlinkPlannerImpl, FlinkTypeFactory}
import org.apache.flink.table.planner.plan.`trait`.{MiniBatchInterval, MiniBatchMode}
-import org.apache.flink.table.planner.{JBoolean, JByte, JDouble, JFloat, JLong, JShort}
import org.apache.calcite.config.NullCollation
import org.apache.calcite.plan.RelOptUtil
@@ -30,13 +30,11 @@ import org.apache.calcite.rex.{RexBuilder, RexCall, RexInputRef, RexLiteral, Rex
import org.apache.calcite.sql.SqlExplainLevel
import org.apache.calcite.sql.SqlKind._
import org.apache.calcite.sql.`type`.SqlTypeName._
-import org.apache.calcite.util.ImmutableBitSet
import org.apache.commons.math3.util.ArithmeticUtils
import java.io.{PrintWriter, StringWriter}
import java.math.BigDecimal
import java.sql.{Date, Time, Timestamp}
-import java.util
import java.util.Calendar
import scala.collection.JavaConversions._
@@ -207,25 +205,22 @@ object FlinkRelOptUtil {
}
/**
- * Gets values of RexLiteral
+ * Gets values of [[RexLiteral]] by its broad type.
+ *
+ * <p> All number value (TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, DECIMAL)
+ * will be converted to BigDecimal
*
* @param literal input RexLiteral
- * @return values of the input RexLiteral
+ * @return value of the input RexLiteral
*/
- def getLiteralValue(literal: RexLiteral): Comparable[_] = {
+ def getLiteralValueByBroadType(literal: RexLiteral): Comparable[_] = {
if (literal.isNull) {
null
} else {
- val literalType = literal.getType
- literalType.getSqlTypeName match {
+ literal.getTypeName match {
case BOOLEAN => RexLiteral.booleanValue(literal)
- case TINYINT => literal.getValueAs(classOf[JByte])
- case SMALLINT => literal.getValueAs(classOf[JShort])
- case INTEGER => literal.getValueAs(classOf[Integer])
- case BIGINT => literal.getValueAs(classOf[JLong])
- case FLOAT => literal.getValueAs(classOf[JFloat])
- case DOUBLE => literal.getValueAs(classOf[JDouble])
- case DECIMAL => literal.getValue3.asInstanceOf[BigDecimal]
+ case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE | DECIMAL =>
+ literal.getValue3.asInstanceOf[BigDecimal]
case VARCHAR | CHAR => literal.getValueAs(classOf[String])
// temporal types
@@ -236,43 +231,12 @@ object FlinkRelOptUtil {
case TIMESTAMP =>
new Timestamp(literal.getValueAs(classOf[Calendar]).getTimeInMillis)
case _ =>
- throw new IllegalArgumentException(s"Literal type $literalType is not supported!")
+ throw new IllegalArgumentException(
+ s"Literal type ${literal.getTypeName} is not supported!")
}
}
}
- private def fix(operands: util.List[RexNode], before: Int, after: Int): Unit = {
- if (before == after) {
- return
- }
- operands.indices.foreach { i =>
- val node = operands.get(i)
- operands.set(i, RexUtil.shift(node, before, after - before))
- }
- }
-
- /**
- * Categorizes whether a bit set contains bits left and right of a line.
- */
- private object Side extends Enumeration {
- type Side = Value
- val LEFT, RIGHT, BOTH, EMPTY = Value
-
- private[plan] def of(bitSet: ImmutableBitSet, middle: Int): Side = {
- val firstBit = bitSet.nextSetBit(0)
- if (firstBit < 0) {
- return EMPTY
- }
- if (firstBit >= middle) {
- return RIGHT
- }
- if (bitSet.nextSetBit(middle) < 0) {
- return LEFT
- }
- BOTH
- }
- }
-
/**
* Partitions the [[RexNode]] in two [[RexNode]] according to a predicate.
* The result is a pair of RexNode: the first RexNode consists of RexNode that satisfy the
diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
index d7fa694..6f6d588 100644
--- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
+++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/catalog/CatalogStatisticsTest.java
@@ -55,6 +55,7 @@ import org.apache.calcite.util.ImmutableBitSet;
import org.junit.Before;
import org.junit.Test;
+import java.math.BigDecimal;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
@@ -158,7 +159,9 @@ public class CatalogStatisticsTest {
// long type
assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0);
assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0);
- assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0));
+ assertEquals(
+ ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+ mq.getColumnInterval(t1, 0));
// string type
assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0);
@@ -183,7 +186,9 @@ public class CatalogStatisticsTest {
// long type
assertEquals(46.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(0), null), 0.0);
assertEquals(154.0, mq.getColumnNullCount(t1, 0), 0.0);
- assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(t1, 0));
+ assertEquals(
+ ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+ mq.getColumnInterval(t1, 0));
// string type
assertEquals(40.0, mq.getDistinctRowCount(t1, ImmutableBitSet.of(1), null), 0.0);
@@ -324,7 +329,9 @@ public class CatalogStatisticsTest {
// long type
assertEquals(23.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(1), null), 0.0);
assertEquals(77.0, mq.getColumnNullCount(rel, 1), 0.0);
- assertEquals(ValueInterval$.MODULE$.apply(-123L, 763322L, true, true), mq.getColumnInterval(rel, 1));
+ assertEquals(
+ ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123L), BigDecimal.valueOf(763322L), true, true),
+ mq.getColumnInterval(rel, 1));
// string type
assertEquals(20.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(2), null), 0.0);
@@ -343,7 +350,9 @@ public class CatalogStatisticsTest {
// double type
assertEquals(73.0, mq.getDistinctRowCount(rel, ImmutableBitSet.of(4), null), 0.0);
assertEquals(27.0, mq.getColumnNullCount(rel, 4), 0.0);
- assertEquals(ValueInterval$.MODULE$.apply(-123.35, 7633.22, true, true), mq.getColumnInterval(rel, 4));
+ assertEquals(
+ ValueInterval$.MODULE$.apply(BigDecimal.valueOf(-123.35), BigDecimal.valueOf(7633.22), true, true),
+ mq.getColumnInterval(rel, 4));
}
private void alterTableStatisticsWithUnknownRowCount(
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
index b5a75b6..e4d3b2e 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.xml
@@ -215,6 +215,27 @@ GroupAggregate(select=[AVG_RETRACT(a) AS EXPR$0], changelogMode=[I,UA,D])
]]>
</Resource>
</TestCase>
+ <TestCase name="testColumnIntervalValidation">
+ <Resource name="sql">
+ <![CDATA[SELECT b, SUM(a) FROM MyTable WHERE a > 0.1 and a < 10 GROUP BY b]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
++- LogicalProject(b=[$1], a=[$0])
+ +- LogicalFilter(condition=[AND(>($0, 0.1:DECIMAL(2, 1)), <($0, 10))])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, proctime, rowtime)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+GroupAggregate(groupBy=[b], select=[b, SUM(a) AS EXPR$1])
++- Exchange(distribution=[hash[b]])
+ +- Calc(select=[b, a], where=[AND(>(a, 0.1:DECIMAL(2, 1)), <(a, 10))])
+ +- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c, proctime, rowtime)]]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testGroupByWithConstantKey">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
index fee3ce5..da04c77 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdColumnIntervalTest.scala
@@ -44,11 +44,11 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnTableScan(): Unit = {
Array(studentLogicalScan, studentFlinkLogicalScan, studentBatchScan, studentStreamScan)
.foreach { scan =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(scan, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(scan, 0))
assertNull(mq.getColumnInterval(scan, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(scan, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(scan, 3))
- assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(scan, 4))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(scan, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(scan, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(scan, 4))
assertNull(mq.getColumnInterval(scan, 5))
assertNull(mq.getColumnInterval(scan, 6))
}
@@ -66,7 +66,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
assertEquals(ValueInterval.empty, mq.getColumnInterval(emptyValues, idx))
}
- assertEquals(ValueInterval(1L, 3L), mq.getColumnInterval(logicalValues, 0))
+ assertEquals(ValueInterval(bd(1L), bd(3L)), mq.getColumnInterval(logicalValues, 0))
assertEquals(ValueInterval(false, true), mq.getColumnInterval(logicalValues, 1))
assertEquals(ValueInterval(
new Date(new DateString(2017, 9, 1).getMillisSinceEpoch),
@@ -80,7 +80,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
new Timestamp(new TimestampString(2017, 7, 1, 1, 0, 0).getMillisSinceEpoch),
new Timestamp(new TimestampString(2017, 10, 1, 1, 0, 0).getMillisSinceEpoch)),
mq.getColumnInterval(logicalValues, 4))
- assertEquals(ValueInterval(-1D, 3.12D), mq.getColumnInterval(logicalValues, 5))
+ assertEquals(ValueInterval(bd(-1D), bd(3.12D)), mq.getColumnInterval(logicalValues, 5))
assertEquals(ValueInterval.empty, mq.getColumnInterval(logicalValues, 6))
assertEquals(ValueInterval("F", "xyz"), mq.getColumnInterval(logicalValues, 7))
}
@@ -94,17 +94,19 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
@Test
def testGetColumnIntervalOnProject(): Unit = {
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalProject, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(logicalProject, 0))
assertNull(mq.getColumnInterval(logicalProject, 1))
- assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(logicalProject, 2))
- assertEqualsAsDouble(ValueInterval(11, 17), mq.getColumnInterval(logicalProject, 3))
- assertEqualsAsDouble(ValueInterval(177.1, 189.31), mq.getColumnInterval(logicalProject, 4))
+ assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(logicalProject, 2))
+ assertEqualsAsDouble(ValueInterval(bd(11), bd(17)), mq.getColumnInterval(logicalProject, 3))
+ assertEqualsAsDouble(
+ ValueInterval(bd(177.1), bd(189.31)), mq.getColumnInterval(logicalProject, 4))
assertNull(mq.getColumnInterval(logicalProject, 5))
- assertEqualsAsDouble(ValueInterval(161.0D, 172.1), mq.getColumnInterval(logicalProject, 6))
- assertEquals(ValueInterval(1, 2), mq.getColumnInterval(logicalProject, 7))
+ assertEqualsAsDouble(
+ ValueInterval(bd(161.0D), bd(172.1)), mq.getColumnInterval(logicalProject, 6))
+ assertEquals(ValueInterval(bd(1), bd(2)), mq.getColumnInterval(logicalProject, 7))
assertEquals(ValueInterval(true, true), mq.getColumnInterval(logicalProject, 8))
- assertEquals(ValueInterval(2.1D, 2.1D), mq.getColumnInterval(logicalProject, 9))
- assertEquals(ValueInterval(2L, 2L), mq.getColumnInterval(logicalProject, 10))
+ assertEquals(ValueInterval(bd(2.1D), bd(2.1D)), mq.getColumnInterval(logicalProject, 9))
+ assertEquals(ValueInterval(bd(2L), bd(2L)), mq.getColumnInterval(logicalProject, 10))
assertNull(mq.getColumnInterval(logicalProject, 11))
// 3 * (score - 2)
@@ -128,8 +130,8 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
val expr0 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1))
// id <= 20
val expr1 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(0), relBuilder.literal(20))
- // id > 10
- val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(10))
+ // id > 10.0 (note: the types of id and literal are different)
+ val expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(10.0))
// DIV(id, 2) > 3
val expr3 = relBuilder.call(GREATER_THAN,
relBuilder.call(DIVIDE, relBuilder.field(0), relBuilder.literal(2)),
@@ -145,33 +147,36 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
// id > -1
val filter0 = relBuilder.push(ts).filter(expr0).build
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(filter0, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(filter0, 0))
// id <= 20
val filter1 = relBuilder.push(ts).filter(expr1).build
- assertEquals(ValueInterval(0, 20), mq.getColumnInterval(filter1, 0))
+ assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(filter1, 0))
// id <= 20 AND id > 10 AND DIV(id, 2) > 3
val filter2 = relBuilder.push(ts).filter(expr1, expr2, expr3).build
- assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(filter2, 0))
+ assertEquals(
+ ValueInterval(bd(10.0), bd(20), includeLower = false), mq.getColumnInterval(filter2, 0))
// id <= 20 AND id > 10 AND score < 4.1
val filter3 = relBuilder.push(ts).filter(expr1, expr2, expr4).build
- assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(filter3, 0))
+ assertEquals(
+ ValueInterval(bd(10.0), bd(20), includeLower = false),
+ mq.getColumnInterval(filter3, 0))
// score > 6.0 OR score <= 4.0
val filter4 = relBuilder.push(ts).filter(relBuilder.call(OR, expr5, expr6)).build
- assertEquals(ValueInterval(2.7, 4.0), mq.getColumnInterval(filter4, 2))
+ assertEquals(ValueInterval(bd(2.7), bd(4.0)), mq.getColumnInterval(filter4, 2))
// score > 6.0 OR score <= 4.0 OR id < 20
val filter5 = relBuilder.push(ts).filter(relBuilder.call(OR, expr5, expr6, expr1)).build
- assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(filter5, 2))
+ assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(filter5, 2))
// (id <= 20 AND score < 4.1) OR NOT(DIV(id, 2) > 3 OR score > 1.9)
val filter6 = relBuilder.push(ts).filter(relBuilder.call(OR,
relBuilder.call(AND, expr1, expr4),
relBuilder.call(NOT, relBuilder.call(OR, expr3, expr7)))).build
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(filter6, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(filter6, 0))
// (id <= 20 AND score < 4.1) OR NOT(id <= 20 OR score > 1.9)
val filter7 = relBuilder.push(ts).filter(relBuilder.call(OR,
@@ -180,7 +185,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
relBuilder.call(OR,
RexUtil.negate(relBuilder.getRexBuilder, expr1.asInstanceOf[RexCall]),
expr7)))).build
- assertEquals(ValueInterval(0, 20), mq.getColumnInterval(filter7, 0))
+ assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(filter7, 0))
}
@Test
@@ -210,46 +215,48 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
// calc => projects + filter(id <= 20)
val calc1 = createLogicalCalc(studentLogicalScan, outputRowType, projects, List(expr1))
- assertEquals(ValueInterval(0, 20), mq.getColumnInterval(calc1, 0))
+ assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(calc1, 0))
assertNull(mq.getColumnInterval(calc1, 1))
- assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc1, 2))
- assertEqualsAsDouble(ValueInterval(11, 17), mq.getColumnInterval(calc1, 3))
- assertEqualsAsDouble(ValueInterval(177.1, 189.31), mq.getColumnInterval(calc1, 4))
+ assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc1, 2))
+ assertEqualsAsDouble(ValueInterval(bd(11), bd(17)), mq.getColumnInterval(calc1, 3))
+ assertEqualsAsDouble(ValueInterval(bd(177.1), bd(189.31)), mq.getColumnInterval(calc1, 4))
assertNull(mq.getColumnInterval(calc1, 5))
- assertEqualsAsDouble(ValueInterval(161.0D, 172.1), mq.getColumnInterval(calc1, 6))
- assertEquals(ValueInterval(1, 2), mq.getColumnInterval(calc1, 7))
+ assertEqualsAsDouble(ValueInterval(bd(161.0D), bd(172.1)), mq.getColumnInterval(calc1, 6))
+ assertEquals(ValueInterval(bd(1), bd(2)), mq.getColumnInterval(calc1, 7))
assertEquals(ValueInterval(true, true), mq.getColumnInterval(calc1, 8))
- assertEquals(ValueInterval(2.1D, 2.1D), mq.getColumnInterval(calc1, 9))
- assertEquals(ValueInterval(2L, 2L), mq.getColumnInterval(calc1, 10))
+ assertEquals(ValueInterval(bd(2.1D), bd(2.1D)), mq.getColumnInterval(calc1, 9))
+ assertEquals(ValueInterval(bd(2L), bd(2L)), mq.getColumnInterval(calc1, 10))
assertNull(mq.getColumnInterval(calc1, 11))
// calc => project + filter(id <= 20 AND id > 10 AND DIV(id, 2) > 3)
val calc2 = createLogicalCalc(
studentLogicalScan, outputRowType, projects, List(expr1, expr2, expr3))
- assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(calc2, 0))
+ assertEquals(
+ ValueInterval(bd(10), bd(20), includeLower = false), mq.getColumnInterval(calc2, 0))
assertNull(mq.getColumnInterval(calc2, 1))
// calc => project + filter(id <= 20 AND id > 10 AND score < 4.1)
val calc3 = createLogicalCalc(
studentLogicalScan, outputRowType, projects, List(expr1, expr2, expr4))
- assertEquals(ValueInterval(10, 20, includeLower = false), mq.getColumnInterval(calc3, 0))
+ assertEquals(
+ ValueInterval(bd(10), bd(20), includeLower = false), mq.getColumnInterval(calc3, 0))
// calc => project + filter(score > 6.0 OR score <= 4.0)
val calc4 = createLogicalCalc(
studentLogicalScan, outputRowType, projects, List(relBuilder.call(OR, expr5, expr6)))
- assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc4, 2))
+ assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc4, 2))
// calc => project + filter(score > 6.0 OR score <= 4.0 OR id < 20)
val calc5 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
List(relBuilder.call(OR, expr5, expr6, expr1)))
- assertEqualsAsDouble(ValueInterval(2.9, 5.0), mq.getColumnInterval(calc5, 2))
+ assertEqualsAsDouble(ValueInterval(bd(2.9), bd(5.0)), mq.getColumnInterval(calc5, 2))
// calc => project + filter((id <= 20 AND score < 4.1) OR NOT(DIV(id, 2) > 3 OR score > 1.9))
val calc6 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
List(relBuilder.call(OR,
relBuilder.call(AND, expr1, expr4),
relBuilder.call(NOT, relBuilder.call(OR, expr3, expr7)))))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(calc6, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(calc6, 0))
// calc => project + filter: ($0 <=2 and $1 < 1.1) or not( $0>2 or $1 > 1.9)
val calc7 = createLogicalCalc(studentLogicalScan, outputRowType, projects,
@@ -259,7 +266,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
relBuilder.call(OR,
RexUtil.negate(relBuilder.getRexBuilder, expr1.asInstanceOf[RexCall]),
expr7)))))
- assertEquals(ValueInterval(0, 20), mq.getColumnInterval(calc7, 0))
+ assertEquals(ValueInterval(bd(0), bd(20)), mq.getColumnInterval(calc7, 0))
relBuilder.push(studentLogicalScan)
val expr8 = relBuilder.call(CASE, expr5, relBuilder.literal(1), relBuilder.literal(0))
@@ -274,24 +281,24 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
val calc8 = createLogicalCalc(
studentLogicalScan, rowType, List(expr8, expr9, expr10, expr11), List())
- assertEquals(ValueInterval(0, 1), mq.getColumnInterval(calc8, 0))
- assertEquals(ValueInterval(10, 12), mq.getColumnInterval(calc8, 1))
- assertEquals(ValueInterval(0, 12), mq.getColumnInterval(calc8, 2))
- assertEquals(ValueInterval(1, 18), mq.getColumnInterval(calc8, 3))
+ assertEquals(ValueInterval(bd(0), bd(1)), mq.getColumnInterval(calc8, 0))
+ assertEquals(ValueInterval(bd(10), bd(12)), mq.getColumnInterval(calc8, 1))
+ assertEquals(ValueInterval(bd(0), bd(12)), mq.getColumnInterval(calc8, 2))
+ assertEquals(ValueInterval(bd(1), bd(18)), mq.getColumnInterval(calc8, 3))
}
@Test
def testGetColumnIntervalOnExpand(): Unit = {
Array(logicalExpand, flinkLogicalExpand, batchExpand, streamExpand).foreach {
expand =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(expand, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(expand, 0))
assertNull(mq.getColumnInterval(expand, 1))
- assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(expand, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(expand, 3))
- assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(expand, 4))
+ assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(expand, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(expand, 3))
+ assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(expand, 4))
assertEquals(null, mq.getColumnInterval(expand, 5))
assertEquals(null, mq.getColumnInterval(expand, 6))
- assertEquals(ValueInterval(0, 5), mq.getColumnInterval(expand, 7))
+ assertEquals(ValueInterval(bd(0), bd(5)), mq.getColumnInterval(expand, 7))
}
}
@@ -302,11 +309,11 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
logicalSortLimit, flinkLogicalSortLimit, batchSortLimit, batchLocalSortLimit,
batchGlobalSortLimit, streamSortLimit).foreach {
sort =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(sort, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(sort, 0))
assertNull(mq.getColumnInterval(sort, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(sort, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(sort, 3))
- assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(sort, 4))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(sort, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(sort, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(sort, 4))
assertNull(mq.getColumnInterval(sort, 5))
assertNull(mq.getColumnInterval(sort, 6))
}
@@ -316,55 +323,53 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnRank(): Unit = {
Array(logicalRank, flinkLogicalRank, batchLocalRank, batchGlobalRank, streamRank).foreach {
rank =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(rank, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
assertNull(mq.getColumnInterval(rank, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(rank, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(rank, 3))
- assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(rank, 4))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
assertNull(mq.getColumnInterval(rank, 5))
assertNull(mq.getColumnInterval(rank, 6))
rank match {
case r: BatchExecRank if !r.isGlobal => // local batch rank does not output rank function
- case _ => assertEquals(ValueInterval(1, 5), mq.getColumnInterval(rank, 7))
+ case _ => assertEquals(ValueInterval(bd(1), bd(5)), mq.getColumnInterval(rank, 7))
}
}
Array(logicalRankWithVariableRange, flinkLogicalRankWithVariableRange,
streamRankWithVariableRange).foreach {
rank =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(logicalRankWithVariableRange, 0))
- assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval
- (logicalRankWithVariableRange, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(logicalRankWithVariableRange, 3))
- assertEquals(ValueInterval(161.0D, 172.1D),
- mq.getColumnInterval(logicalRankWithVariableRange, 4))
- assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 5))
- assertNull(mq.getColumnInterval(logicalRankWithVariableRange, 6))
- assertEquals(ValueInterval(1, 18), mq.getColumnInterval(logicalRankWithVariableRange, 7))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
+ assertNull(mq.getColumnInterval(rank, 1))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
+ assertNull(mq.getColumnInterval(rank, 5))
+ assertNull(mq.getColumnInterval(rank, 6))
+ assertEquals(ValueInterval(bd(1), bd(18)), mq.getColumnInterval(rank, 7))
}
Array(logicalRowNumber, flinkLogicalRowNumber, streamRowNumber).foreach {
rank =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(rank, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(rank, 0))
assertNull(mq.getColumnInterval(rank, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(rank, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(rank, 3))
- assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(rank, 4))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(rank, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(rank, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(rank, 4))
assertNull(mq.getColumnInterval(rank, 5))
assertNull(mq.getColumnInterval(rank, 6))
- assertEquals(ValueInterval(3, 6), mq.getColumnInterval(rank, 7))
+ assertEquals(ValueInterval(bd(3), bd(6)), mq.getColumnInterval(rank, 7))
}
}
@Test
def testGetColumnIntervalOnExchange(): Unit = {
val exchange = LogicalExchange.create(studentLogicalScan, RelDistributions.SINGLETON)
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(exchange, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(exchange, 0))
assertNull(mq.getColumnInterval(exchange, 1))
- assertEquals(ValueInterval(2.7D, 4.8D), mq.getColumnInterval(exchange, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(exchange, 3))
- assertEquals(ValueInterval(161.0D, 172.1D), mq.getColumnInterval(exchange, 4))
+ assertEquals(ValueInterval(bd(2.7D), bd(4.8D)), mq.getColumnInterval(exchange, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(exchange, 3))
+ assertEquals(ValueInterval(bd(161.0D), bd(172.1D)), mq.getColumnInterval(exchange, 4))
assertNull(mq.getColumnInterval(exchange, 5))
assertNull(mq.getColumnInterval(exchange, 6))
}
@@ -373,22 +378,22 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnAggregate(): Unit = {
Array(logicalAgg, flinkLogicalAgg).foreach {
agg =>
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
assertNull(mq.getColumnInterval(agg, 3))
assertNull(mq.getColumnInterval(agg, 4))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
}
Array(logicalAggWithAuxGroup, flinkLogicalAggWithAuxGroup).foreach {
agg =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(agg, 2))
assertNull(mq.getColumnInterval(agg, 3))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 4))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 4))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
}
}
@@ -396,40 +401,40 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnBatchExecAggregate(): Unit = {
Array(batchGlobalAggWithLocal, batchGlobalAggWithoutLocal).foreach {
agg =>
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
assertNull(mq.getColumnInterval(agg, 3))
assertNull(mq.getColumnInterval(agg, 4))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
}
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(batchLocalAgg, 0))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(batchLocalAgg, 0))
assertNull(mq.getColumnInterval(batchLocalAgg, 1))
assertNull(mq.getColumnInterval(batchLocalAgg, 2))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(batchLocalAgg, 3))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(batchLocalAgg, 3))
assertNull(mq.getColumnInterval(batchLocalAgg, 4))
assertNull(mq.getColumnInterval(batchLocalAgg, 5))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAgg, 6))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAgg, 6))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 0))
assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 1))
- assertEquals(ValueInterval(161.0, 172.1),
+ assertEquals(ValueInterval(bd(161.0), bd(172.1)),
mq.getColumnInterval(batchLocalAggWithAuxGroup, 2))
assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 3))
assertNull(mq.getColumnInterval(batchLocalAggWithAuxGroup, 4))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 5))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 6))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 5))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalAggWithAuxGroup, 6))
Array(batchGlobalAggWithLocalWithAuxGroup, batchGlobalAggWithoutLocalWithAuxGroup)
.foreach {
agg =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(161.0, 172.1), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getColumnInterval(agg, 2))
assertNull(mq.getColumnInterval(agg, 3))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 4))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 4))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
}
}
@@ -437,28 +442,28 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnStreamExecAggregate(): Unit = {
Array(streamGlobalAggWithLocal, streamGlobalAggWithoutLocal).foreach {
agg =>
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(agg, 2))
assertNull(mq.getColumnInterval(agg, 3))
assertNull(mq.getColumnInterval(agg, 4))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 5))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 5))
}
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(streamLocalAgg, 0))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(streamLocalAgg, 0))
assertNull(mq.getColumnInterval(streamLocalAgg, 1))
assertNull(mq.getColumnInterval(streamLocalAgg, 2))
- assertEquals(ValueInterval(2.7, null), mq.getColumnInterval(streamLocalAgg, 3))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getColumnInterval(streamLocalAgg, 3))
assertNull(mq.getColumnInterval(streamLocalAgg, 4))
assertNull(mq.getColumnInterval(streamLocalAgg, 5))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(streamLocalAgg, 6))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(streamLocalAgg, 6))
}
@Test
def testGetColumnIntervalOnTableAggregate(): Unit = {
Array(logicalTableAgg, flinkLogicalTableAgg, streamExecTableAgg).foreach {
agg =>
- assertEquals(RightSemiInfiniteValueInterval(0, true), mq.getColumnInterval(agg, 0))
+ assertEquals(RightSemiInfiniteValueInterval(bd(0), true), mq.getColumnInterval(agg, 0))
assertNull(mq.getColumnInterval(agg, 1))
assertNull(mq.getColumnInterval(agg, 2))
}
@@ -467,7 +472,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
@Test
def testGetColumnIntervalOnWindowTableAgg(): Unit = {
Array(logicalWindowTableAgg, flinkLogicalWindowTableAgg, streamWindowTableAgg).foreach { agg =>
- assertEquals(ValueInterval(5, 45), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(agg, 0))
assertEquals(null, mq.getColumnInterval(agg, 1))
assertEquals(null, mq.getColumnInterval(agg, 2))
assertEquals(null, mq.getColumnInterval(agg, 3))
@@ -481,29 +486,33 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnWindowAgg(): Unit = {
Array(logicalWindowAgg, flinkLogicalWindowAgg, batchGlobalWindowAggWithLocalAgg,
batchGlobalWindowAggWithoutLocalAgg, streamWindowAgg).foreach { agg =>
- assertEquals(ValueInterval(5, 45), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(agg, 0))
assertEquals(null, mq.getColumnInterval(agg, 1))
- assertEquals(RightSemiInfiniteValueInterval(0), mq.getColumnInterval(agg, 2))
+ assertEquals(RightSemiInfiniteValueInterval(bd(0)), mq.getColumnInterval(agg, 2))
assertEquals(null, mq.getColumnInterval(agg, 3))
}
- assertEquals(ValueInterval(5, 45), mq.getColumnInterval(batchLocalWindowAgg, 0))
+ assertEquals(ValueInterval(bd(5), bd(45)), mq.getColumnInterval(batchLocalWindowAgg, 0))
assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 1))
assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 2))
- assertEquals(RightSemiInfiniteValueInterval(0), mq.getColumnInterval(batchLocalWindowAgg, 3))
+ assertEquals(
+ RightSemiInfiniteValueInterval(bd(0)), mq.getColumnInterval(batchLocalWindowAgg, 3))
assertEquals(null, mq.getColumnInterval(batchLocalWindowAgg, 4))
Array(logicalWindowAggWithAuxGroup, flinkLogicalWindowAggWithAuxGroup,
batchGlobalWindowAggWithLocalAggWithAuxGroup,
batchGlobalWindowAggWithoutLocalAggWithAuxGroup).foreach { agg =>
- assertEquals(ValueInterval(5, 55), mq.getColumnInterval(agg, 0))
- assertEquals(ValueInterval(0, 50), mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(5), bd(55)), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(0), bd(50)), mq.getColumnInterval(agg, 1))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 2))
assertEquals(null, mq.getColumnInterval(agg, 3))
}
- assertEquals(ValueInterval(5, 55), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 0))
+ assertEquals(
+ ValueInterval(bd(5), bd(55)), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 0))
assertEquals(null, mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 1))
- assertEquals(ValueInterval(0, 50), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 2))
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 3))
+ assertEquals(
+ ValueInterval(bd(0), bd(50)), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 2))
+ assertEquals(
+ ValueInterval(bd(0), null), mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 3))
assertEquals(null, mq.getColumnInterval(batchLocalWindowAggWithAuxGroup, 4))
}
@@ -511,10 +520,10 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnOverAgg(): Unit = {
Array(flinkLogicalOverAgg, batchOverAgg).foreach {
agg =>
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(agg, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(agg, 0))
assertEquals(null, mq.getColumnInterval(agg, 1))
- assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(agg, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(agg, 3))
+ assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(agg, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(agg, 3))
assertNull(mq.getColumnInterval(agg, 4))
assertNull(mq.getColumnInterval(agg, 5))
assertNull(mq.getColumnInterval(agg, 6))
@@ -524,10 +533,10 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
assertNull(mq.getColumnInterval(agg, 10))
}
- assertEquals(ValueInterval(0, null), mq.getColumnInterval(streamOverAgg, 0))
+ assertEquals(ValueInterval(bd(0), null), mq.getColumnInterval(streamOverAgg, 0))
assertEquals(null, mq.getColumnInterval(streamOverAgg, 1))
- assertEquals(ValueInterval(2.7, 4.8), mq.getColumnInterval(streamOverAgg, 2))
- assertEquals(ValueInterval(12, 18), mq.getColumnInterval(streamOverAgg, 3))
+ assertEquals(ValueInterval(bd(2.7), bd(4.8)), mq.getColumnInterval(streamOverAgg, 2))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getColumnInterval(streamOverAgg, 3))
assertNull(mq.getColumnInterval(streamOverAgg, 4))
assertNull(mq.getColumnInterval(streamOverAgg, 5))
assertNull(mq.getColumnInterval(streamOverAgg, 6))
@@ -546,31 +555,33 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
rexBuilder.makeLiteral(1000L, longType, false))
).build
- assertEquals(ValueInterval(100, null, includeLower = false), mq.getColumnInterval(join, 0))
- assertEquals(ValueInterval(1L, 800000000L), mq.getColumnInterval(join, 1))
+ assertEquals(ValueInterval(bd(100), null, includeLower = false), mq.getColumnInterval(join, 0))
+ assertEquals(ValueInterval(bd(1L), bd(800000000L)), mq.getColumnInterval(join, 1))
assertNull(mq.getColumnInterval(join, 2))
assertNull(mq.getColumnInterval(join, 3))
- assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(join, 4))
+ assertEquals(ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(join, 4))
assertNull(mq.getColumnInterval(join, 5))
- assertEquals(ValueInterval(8L, 1000L), mq.getColumnInterval(join, 6))
+ assertEquals(ValueInterval(bd(8L), bd(1000L)), mq.getColumnInterval(join, 6))
assertNull(mq.getColumnInterval(join, 7))
assertNull(mq.getColumnInterval(join, 8))
- assertEquals(ValueInterval(0, null, includeLower = true),
+ assertEquals(ValueInterval(bd(0), null, includeLower = true),
mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 0))
- assertEquals(ValueInterval(1L, 800000000L),
+ assertEquals(ValueInterval(bd(1L), bd(800000000L)),
mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 1))
assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 2))
assertNull(mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 3))
- assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
+ assertEquals(
+ ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(logicalSemiJoinNotOnUniqueKeys, 4))
- assertEquals(ValueInterval(0, null, includeLower = true),
+ assertEquals(ValueInterval(bd(0), null, includeLower = true),
mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 0))
- assertEquals(ValueInterval(1L, 800000000L),
+ assertEquals(ValueInterval(bd(1L), bd(800000000L)),
mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 1))
assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 2))
assertNull(mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 3))
- assertEquals(ValueInterval(1L, 100L), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
+ assertEquals(
+ ValueInterval(bd(1L), bd(100L)), mq.getColumnInterval(logicalAntiJoinWithoutEquiCond, 4))
}
@Test
@@ -579,7 +590,7 @@ class FlinkRelMdColumnIntervalTest extends FlinkRelMdHandlerTestBase {
val ts2 = relBuilder.scan("MyTable2").build()
val union = relBuilder.push(ts1).push(ts2).union(true).build()
assertNull(mq.getColumnInterval(union, 0))
- assertEquals(ValueInterval(1L, 800000000L), mq.getColumnInterval(union, 1))
+ assertEquals(ValueInterval(bd(1L), bd(800000000L)), mq.getColumnInterval(union, 1))
assertNull(mq.getColumnInterval(union, 2))
assertNull(mq.getColumnInterval(union, 3))
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
index f8a0a32..c983900 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdFilteredColumnIntervalTest.scala
@@ -17,12 +17,12 @@
*/
package org.apache.flink.table.planner.plan.metadata
-import org.apache.flink.table.planner.plan.stats.{RightSemiInfiniteValueInterval,ValueInterval}
+import org.apache.flink.table.planner.plan.stats.{RightSemiInfiniteValueInterval, ValueInterval}
import org.apache.flink.table.types.logical._
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable.{EQUALS, GREATER_THAN, IS_FALSE, IS_TRUE, LESS_THAN, LESS_THAN_OR_EQUAL, DIVIDE}
+import org.apache.calcite.sql.fun.SqlStdOperatorTable.{DIVIDE, EQUALS, GREATER_THAN, IS_FALSE, IS_TRUE, LESS_THAN, LESS_THAN_OR_EQUAL}
import org.junit.Assert.{assertEquals, assertNull}
import org.junit.{Before, Test}
@@ -39,8 +39,8 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
relBuilder.push(ts)
// a <= 2
expr1 = relBuilder.call(LESS_THAN_OR_EQUAL, relBuilder.field(0), relBuilder.literal(2))
- // a > -1
- expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1))
+ // a > -1.0 (the types of `a` and literal are different)
+ expr2 = relBuilder.call(GREATER_THAN, relBuilder.field(0), relBuilder.literal(-1.0))
// a / 2 > 3
expr3 = relBuilder.call(GREATER_THAN,
relBuilder.call(DIVIDE, relBuilder.field(0), relBuilder.literal(2)),
@@ -68,46 +68,51 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
expr1, expr2, expr3, expr4, expr5, expr6, expr7, expr8, expr9)
}
-
@Test
def testGetColumnIntervalOnProject(): Unit = {
val p = relBuilder.project(projects: _*).build()
- assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(p, 0, -1))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(p, 1, -1))
- assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(p, 0, 2))
- assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(p, 0, 3))
- assertEquals(ValueInterval(-1, 5, includeLower = false), mq.getFilteredColumnInterval(p, 0, 5))
+ assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(p, 0, -1))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, -1))
+ assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(p, 0, 2))
+ assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(p, 0, 3))
+ assertEquals(ValueInterval(bd(-1.0), bd(5), includeLower = false),
+ mq.getFilteredColumnInterval(p, 0, 5))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(p, 1, 7))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(p, 1, 8))
- assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(p, 0, 9))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false), mq.getFilteredColumnInterval(p, 1, 7))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, 8))
+ assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(p, 0, 9))
assertEquals(
- ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(p, 1, 10))
+ ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+ mq.getFilteredColumnInterval(p, 1, 10))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(p, 1, 11))
- assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(p, 1, 12))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false), mq.getFilteredColumnInterval(p, 1, 11))
+ assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(p, 1, 12))
}
@Test
def testGetColumnIntervalOnFilter(): Unit = {
val filter = relBuilder.project(projects: _*).filter(expr1).build()
- assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(filter, 0, -1))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(filter, 1, -1))
- assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(filter, 0, 2))
- assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(filter, 0, 3))
+ assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(filter, 0, -1))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, -1))
+ assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(filter, 0, 2))
+ assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(filter, 0, 3))
assertEquals(
- ValueInterval(-1, 2, includeLower = false), mq.getFilteredColumnInterval(filter, 0, 5))
+ ValueInterval(bd(-1.0), bd(2), includeLower = false),
+ mq.getFilteredColumnInterval(filter, 0, 5))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(filter, 1, 7))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(filter, 1, 8))
- assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(filter, 0, 9))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+ mq.getFilteredColumnInterval(filter, 1, 7))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, 8))
+ assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(filter, 0, 9))
assertEquals(
- ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(filter, 1, 10))
+ ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+ mq.getFilteredColumnInterval(filter, 1, 10))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(filter, 1, 11))
- assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(filter, 1, 12))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+ mq.getFilteredColumnInterval(filter, 1, 11))
+ assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(filter, 1, 12))
}
@Test
@@ -119,43 +124,47 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
new BooleanType(), new BooleanType(), new BooleanType(), new BooleanType(),
new BooleanType()))
val calc = createLogicalCalc(ts, outputRowType, projects, List(expr1))
- assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(calc, 0, -1))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(calc, 1, -1))
- assertEquals(ValueInterval(-5, 2), mq.getFilteredColumnInterval(calc, 0, 2))
- assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(calc, 0, 3))
+ assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(calc, 0, -1))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, -1))
+ assertEquals(ValueInterval(bd(-5), bd(2)), mq.getFilteredColumnInterval(calc, 0, 2))
+ assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(calc, 0, 3))
assertEquals(
- ValueInterval(-1, 2, includeLower = false), mq.getFilteredColumnInterval(calc, 0, 5))
+ ValueInterval(bd(-1.0), bd(2), includeLower = false),
+ mq.getFilteredColumnInterval(calc, 0, 5))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(calc, 1, 7))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(calc, 1, 8))
- assertEquals(ValueInterval(-5, -1), mq.getFilteredColumnInterval(calc, 0, 9))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+ mq.getFilteredColumnInterval(calc, 1, 7))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, 8))
+ assertEquals(ValueInterval(bd(-5), bd(-1)), mq.getFilteredColumnInterval(calc, 0, 9))
assertEquals(
- ValueInterval(1.9D, 6.1D, includeLower = false), mq.getFilteredColumnInterval(calc, 1, 10))
+ ValueInterval(bd(1.9D), bd(6.1D), includeLower = false),
+ mq.getFilteredColumnInterval(calc, 1, 10))
assertEquals(
- ValueInterval(0D, 1.1D, includeUpper = false), mq.getFilteredColumnInterval(calc, 1, 11))
- assertEquals(ValueInterval(1.1D, 6.1D), mq.getFilteredColumnInterval(calc, 1, 12))
+ ValueInterval(bd(0D), bd(1.1D), includeUpper = false),
+ mq.getFilteredColumnInterval(calc, 1, 11))
+ assertEquals(ValueInterval(bd(1.1D), bd(6.1D)), mq.getFilteredColumnInterval(calc, 1, 12))
}
@Test
def testGetColumnIntervalOnAggregate(): Unit = {
Array(logicalAgg, flinkLogicalAgg, batchGlobalAggWithoutLocal, batchGlobalAggWithLocal,
streamGlobalAggWithoutLocal, streamGlobalAggWithLocal).foreach { agg =>
- assertEquals(ValueInterval(12, 18), mq.getFilteredColumnInterval(agg, 0, -1))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getFilteredColumnInterval(agg, 0, -1))
assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
- assertEquals(ValueInterval(2.7, null), mq.getFilteredColumnInterval(agg, 2, -1))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getFilteredColumnInterval(agg, 2, -1))
}
Array(streamLocalAgg, batchLocalAgg).foreach { agg =>
- assertEquals(ValueInterval(12, 18), mq.getFilteredColumnInterval(streamLocalAgg, 0, -1))
- assertNull(mq.getFilteredColumnInterval(streamLocalAgg, 1, -1))
- assertNull(mq.getFilteredColumnInterval(streamLocalAgg, 2, -1))
- assertEquals(ValueInterval(2.7, null), mq.getFilteredColumnInterval(streamLocalAgg, 3, -1))
+ assertEquals(ValueInterval(bd(12), bd(18)), mq.getFilteredColumnInterval(agg, 0, -1))
+ assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
+ assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
+ assertEquals(ValueInterval(bd(2.7), null), mq.getFilteredColumnInterval(agg, 3, -1))
}
Array(logicalAggWithAuxGroup, flinkLogicalAggWithAuxGroup,
batchGlobalAggWithoutLocalWithAuxGroup, batchGlobalAggWithLocalWithAuxGroup).foreach { agg =>
- assertEquals(ValueInterval(0, null), mq.getFilteredColumnInterval(agg, 0, -1))
+ assertEquals(ValueInterval(bd(0), null), mq.getFilteredColumnInterval(agg, 0, -1))
assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
- assertEquals(ValueInterval(161.0, 172.1), mq.getFilteredColumnInterval(agg, 2, -1))
+ assertEquals(ValueInterval(bd(161.0), bd(172.1)), mq.getFilteredColumnInterval(agg, 2, -1))
assertNull(mq.getFilteredColumnInterval(agg, 3, -1))
}
}
@@ -165,7 +174,7 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
Array(logicalTableAgg, flinkLogicalTableAgg, streamExecTableAgg).foreach {
agg =>
assertEquals(
- RightSemiInfiniteValueInterval(0, true),
+ RightSemiInfiniteValueInterval(bd(0), true),
mq.getFilteredColumnInterval(agg, 0, -1))
assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
@@ -176,7 +185,7 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
def testGetColumnIntervalOnWindowTableAggregate(): Unit = {
Array(logicalWindowTableAgg, flinkLogicalWindowTableAgg, streamWindowTableAgg).foreach {
agg =>
- assertEquals(ValueInterval(5, 45), mq.getFilteredColumnInterval(agg, 0, -1))
+ assertEquals(ValueInterval(bd(5), bd(45)), mq.getFilteredColumnInterval(agg, 0, -1))
assertNull(mq.getFilteredColumnInterval(agg, 1, -1))
assertNull(mq.getFilteredColumnInterval(agg, 2, -1))
assertNull(mq.getFilteredColumnInterval(agg, 3, -1))
@@ -197,10 +206,10 @@ class FlinkRelMdFilteredColumnIntervalTest extends FlinkRelMdHandlerTestBase {
val filter1 = relBuilder.push(ts).project(projects: _*).filter(expr1).build()
val filter2 = relBuilder.push(ts).project(projects: _*).filter(expr7).build()
val union = relBuilder.push(filter1).push(filter2).union(true).build()
- assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(union, 0, -1))
- assertEquals(ValueInterval(0D, 6.1D), mq.getFilteredColumnInterval(union, 1, -1))
- assertEquals(ValueInterval(-5, 5), mq.getFilteredColumnInterval(union, 0, 2))
- assertEquals(ValueInterval(1, 1), mq.getFilteredColumnInterval(union, 0, 3))
+ assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(union, 0, -1))
+ assertEquals(ValueInterval(bd(0D), bd(6.1D)), mq.getFilteredColumnInterval(union, 1, -1))
+ assertEquals(ValueInterval(bd(-5), bd(5)), mq.getFilteredColumnInterval(union, 0, 2))
+ assertEquals(ValueInterval(bd(1), bd(1)), mq.getFilteredColumnInterval(union, 0, 3))
}
@Test
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
index 76188a5..0ce686c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/metadata/FlinkRelMdHandlerTestBase.scala
@@ -129,6 +129,14 @@ class FlinkRelMdHandlerTestBase {
streamPhysicalTraits = cluster.traitSetOf(FlinkConventions.STREAM_PHYSICAL)
}
+ protected def bd(value: Long): BigDecimal = {
+ BigDecimal.valueOf(value)
+ }
+
+ protected def bd(value: Double): BigDecimal = {
+ BigDecimal.valueOf(value)
+ }
+
protected val intType: RelDataType = typeFactory.createFieldTypeFromLogicalType(
new IntType(false))
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
index f01fad7..75c28ab 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/AggregateTest.scala
@@ -272,4 +272,10 @@ class AggregateTest extends TableTestBase {
""".stripMargin
util.verifyPlan(sql)
}
+
+ @Test
+ def testColumnIntervalValidation(): Unit = {
+ // test for FLINK-16577
+ util.verifyPlan("SELECT b, SUM(a) FROM MyTable WHERE a > 0.1 and a < 10 GROUP BY b")
+ }
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
index fd09f32..183634c 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/utils/ColumnIntervalUtilTest.scala
@@ -189,30 +189,4 @@ class ColumnIntervalUtilTest {
)
}
- @Test
- def testConvertStringToNumber(): Unit = {
- assertEqualsWithType(java.lang.Byte.valueOf("1"), "1")
- assertEqualsWithType(java.lang.Short.valueOf("1"), "1")
- assertEqualsWithType(java.lang.Integer.valueOf("1"), "1")
- assertEqualsWithType(java.lang.Float.valueOf("1"), "1")
- assertEqualsWithType(java.lang.Long.valueOf("1"), "1")
- assertEqualsWithType(java.lang.Double.valueOf("1"), "1")
- assertEqualsWithType(new java.math.BigDecimal("1"), "1")
- assertEqualsWithType(new java.math.BigInteger("1"), "1")
- assertEqualsWithType("1".toByte, "1")
- assertEqualsWithType("1".toShort, "1")
- assertEqualsWithType("1".toInt, "1")
- assertEqualsWithType("1".toLong, "1")
- assertEqualsWithType("1".toFloat, "1")
- assertEqualsWithType("1".toDouble, "1")
- assertEquals(None, convertStringToNumber("1", classOf[java.util.Date]))
- }
-
- private def assertEqualsWithType(number: Comparable[_], numberStr: String): Unit = {
- val n = convertStringToNumber(numberStr, number.getClass)
- assertTrue(n.isDefined)
- assertTrue(number.getClass == n.get.getClass)
- assertEquals(number, n.get)
- }
-
}