You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/07/06 20:58:29 UTC
spark git commit: [SPARK-21323][SQL] Rename
plans.logical.statsEstimation.Range to ValueInterval
Repository: spark
Updated Branches:
refs/heads/master 48e44b24a -> bf66335ac
[SPARK-21323][SQL] Rename plans.logical.statsEstimation.Range to ValueInterval
## What changes were proposed in this pull request?
Rename org.apache.spark.sql.catalyst.plans.logical.statsEstimation.Range to ValueInterval.
The current naming is identical to logical operator "range".
Refactoring it to ValueInterval is more accurate.
## How was this patch tested?
unit test
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Wang Gengliang <lt...@gmail.com>
Closes #18549 from gengliangwang/ValueInterval.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf66335a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf66335a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf66335a
Branch: refs/heads/master
Commit: bf66335acab3c0c188f6c378eb8aa6948a259cb2
Parents: 48e44b2
Author: Wang Gengliang <lt...@gmail.com>
Authored: Thu Jul 6 13:58:27 2017 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Jul 6 13:58:27 2017 -0700
----------------------------------------------------------------------
.../statsEstimation/FilterEstimation.scala | 36 ++++----
.../statsEstimation/JoinEstimation.scala | 14 +--
.../plans/logical/statsEstimation/Range.scala | 88 -------------------
.../logical/statsEstimation/ValueInterval.scala | 91 ++++++++++++++++++++
4 files changed, 117 insertions(+), 112 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 5a3bee7..e13db85 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -316,8 +316,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
// decide if the value is in [min, max] of the column.
// We currently don't store min/max for binary/string type.
// Hence, we assume it is in boundary for binary/string type.
- val statsRange = Range(colStat.min, colStat.max, attr.dataType)
- if (statsRange.contains(literal)) {
+ val statsInterval = ValueInterval(colStat.min, colStat.max, attr.dataType)
+ if (statsInterval.contains(literal)) {
if (update) {
// We update ColumnStat structure after apply this equality predicate:
// Set distinctCount to 1, nullCount to 0, and min/max values (if exist) to the literal
@@ -388,9 +388,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
// use [min, max] to filter the original hSet
dataType match {
case _: NumericType | BooleanType | DateType | TimestampType =>
- val statsRange = Range(colStat.min, colStat.max, dataType).asInstanceOf[NumericRange]
+ val statsInterval =
+ ValueInterval(colStat.min, colStat.max, dataType).asInstanceOf[NumericValueInterval]
val validQuerySet = hSet.filter { v =>
- v != null && statsRange.contains(Literal(v, dataType))
+ v != null && statsInterval.contains(Literal(v, dataType))
}
if (validQuerySet.isEmpty) {
@@ -440,12 +441,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
update: Boolean): Option[BigDecimal] = {
val colStat = colStatsMap(attr)
- val statsRange = Range(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericRange]
- val max = statsRange.max.toBigDecimal
- val min = statsRange.min.toBigDecimal
+ val statsInterval =
+ ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
+ val max = statsInterval.max.toBigDecimal
+ val min = statsInterval.min.toBigDecimal
val ndv = BigDecimal(colStat.distinctCount)
- // determine the overlapping degree between predicate range and column's range
+ // determine the overlapping degree between predicate interval and column's interval
val numericLiteral = if (literal.dataType == BooleanType) {
if (literal.value.asInstanceOf[Boolean]) BigDecimal(1) else BigDecimal(0)
} else {
@@ -566,18 +568,18 @@ case class FilterEstimation(plan: Filter) extends Logging {
}
val colStatLeft = colStatsMap(attrLeft)
- val statsRangeLeft = Range(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
- .asInstanceOf[NumericRange]
- val maxLeft = statsRangeLeft.max
- val minLeft = statsRangeLeft.min
+ val statsIntervalLeft = ValueInterval(colStatLeft.min, colStatLeft.max, attrLeft.dataType)
+ .asInstanceOf[NumericValueInterval]
+ val maxLeft = statsIntervalLeft.max
+ val minLeft = statsIntervalLeft.min
val colStatRight = colStatsMap(attrRight)
- val statsRangeRight = Range(colStatRight.min, colStatRight.max, attrRight.dataType)
- .asInstanceOf[NumericRange]
- val maxRight = statsRangeRight.max
- val minRight = statsRangeRight.min
+ val statsIntervalRight = ValueInterval(colStatRight.min, colStatRight.max, attrRight.dataType)
+ .asInstanceOf[NumericValueInterval]
+ val maxRight = statsIntervalRight.max
+ val minRight = statsIntervalRight.min
- // determine the overlapping degree between predicate range and column's range
+ // determine the overlapping degree between predicate interval and column's interval
val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
// Left < Right or Left <= Right
http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f481969..dcbe36d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -175,9 +175,9 @@ case class InnerOuterEstimation(join: Join) extends Logging {
// Check if the two sides are disjoint
val leftKeyStats = leftStats.attributeStats(leftKey)
val rightKeyStats = rightStats.attributeStats(rightKey)
- val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
- val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
- if (Range.isIntersected(lRange, rRange)) {
+ val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
+ val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
+ if (ValueInterval.isIntersected(lInterval, rInterval)) {
// Get the largest ndv among pairs of join keys
val maxNdv = leftKeyStats.distinctCount.max(rightKeyStats.distinctCount)
if (maxNdv > ndvDenom) ndvDenom = maxNdv
@@ -239,16 +239,16 @@ case class InnerOuterEstimation(join: Join) extends Logging {
joinKeyPairs.foreach { case (leftKey, rightKey) =>
val leftKeyStats = leftStats.attributeStats(leftKey)
val rightKeyStats = rightStats.attributeStats(rightKey)
- val lRange = Range(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
- val rRange = Range(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
+ val lInterval = ValueInterval(leftKeyStats.min, leftKeyStats.max, leftKey.dataType)
+ val rInterval = ValueInterval(rightKeyStats.min, rightKeyStats.max, rightKey.dataType)
// When we reach here, join selectivity is not zero, so each pair of join keys should be
// intersected.
- assert(Range.isIntersected(lRange, rRange))
+ assert(ValueInterval.isIntersected(lInterval, rInterval))
// Update intersected column stats
assert(leftKey.dataType.sameType(rightKey.dataType))
val newNdv = leftKeyStats.distinctCount.min(rightKeyStats.distinctCount)
- val (newMin, newMax) = Range.intersect(lRange, rRange, leftKey.dataType)
+ val (newMin, newMax) = ValueInterval.intersect(lInterval, rInterval, leftKey.dataType)
val newMaxLen = math.min(leftKeyStats.maxLen, rightKeyStats.maxLen)
val newAvgLen = (leftKeyStats.avgLen + rightKeyStats.avgLen) / 2
val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
deleted file mode 100644
index 4ac5ba5..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/Range.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
-
-import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.types._
-
-
-/** Value range of a column. */
-trait Range {
- def contains(l: Literal): Boolean
-}
-
-/** For simplicity we use decimal to unify operations of numeric ranges. */
-case class NumericRange(min: Decimal, max: Decimal) extends Range {
- override def contains(l: Literal): Boolean = {
- val lit = EstimationUtils.toDecimal(l.value, l.dataType)
- min <= lit && max >= lit
- }
-}
-
-/**
- * This version of Spark does not have min/max for binary/string types, we define their default
- * behaviors by this class.
- */
-class DefaultRange extends Range {
- override def contains(l: Literal): Boolean = true
-}
-
-/** This is for columns with only null values. */
-class NullRange extends Range {
- override def contains(l: Literal): Boolean = false
-}
-
-object Range {
- def apply(min: Option[Any], max: Option[Any], dataType: DataType): Range = dataType match {
- case StringType | BinaryType => new DefaultRange()
- case _ if min.isEmpty || max.isEmpty => new NullRange()
- case _ =>
- NumericRange(
- min = EstimationUtils.toDecimal(min.get, dataType),
- max = EstimationUtils.toDecimal(max.get, dataType))
- }
-
- def isIntersected(r1: Range, r2: Range): Boolean = (r1, r2) match {
- case (_, _: DefaultRange) | (_: DefaultRange, _) =>
- // The DefaultRange represents string/binary types which do not have max/min stats,
- // we assume they are intersected to be conservative on estimation
- true
- case (_, _: NullRange) | (_: NullRange, _) =>
- false
- case (n1: NumericRange, n2: NumericRange) =>
- n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0
- }
-
- /**
- * Intersected results of two ranges. This is only for two overlapped ranges.
- * The outputs are the intersected min/max values.
- */
- def intersect(r1: Range, r2: Range, dt: DataType): (Option[Any], Option[Any]) = {
- (r1, r2) match {
- case (_, _: DefaultRange) | (_: DefaultRange, _) =>
- // binary/string types don't support intersecting.
- (None, None)
- case (n1: NumericRange, n2: NumericRange) =>
- // Choose the maximum of two min values, and the minimum of two max values.
- val newMin = if (n1.min <= n2.min) n2.min else n1.min
- val newMax = if (n1.max <= n2.max) n1.max else n2.max
- (Some(EstimationUtils.fromDecimal(newMin, dt)),
- Some(EstimationUtils.fromDecimal(newMax, dt)))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/bf66335a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
new file mode 100644
index 0000000..0caaf79
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ValueInterval.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.Literal
+import org.apache.spark.sql.types._
+
+
+/** Value range of a column. */
+trait ValueInterval {
+ def contains(l: Literal): Boolean
+}
+
+/** For simplicity we use decimal to unify operations of numeric intervals. */
+case class NumericValueInterval(min: Decimal, max: Decimal) extends ValueInterval {
+ override def contains(l: Literal): Boolean = {
+ val lit = EstimationUtils.toDecimal(l.value, l.dataType)
+ min <= lit && max >= lit
+ }
+}
+
+/**
+ * This version of Spark does not have min/max for binary/string types, we define their default
+ * behaviors by this class.
+ */
+class DefaultValueInterval extends ValueInterval {
+ override def contains(l: Literal): Boolean = true
+}
+
+/** This is for columns with only null values. */
+class NullValueInterval extends ValueInterval {
+ override def contains(l: Literal): Boolean = false
+}
+
+object ValueInterval {
+ def apply(
+ min: Option[Any],
+ max: Option[Any],
+ dataType: DataType): ValueInterval = dataType match {
+ case StringType | BinaryType => new DefaultValueInterval()
+ case _ if min.isEmpty || max.isEmpty => new NullValueInterval()
+ case _ =>
+ NumericValueInterval(
+ min = EstimationUtils.toDecimal(min.get, dataType),
+ max = EstimationUtils.toDecimal(max.get, dataType))
+ }
+
+ def isIntersected(r1: ValueInterval, r2: ValueInterval): Boolean = (r1, r2) match {
+ case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) =>
+ // The DefaultValueInterval represents string/binary types which do not have max/min stats,
+ // we assume they are intersected to be conservative on estimation
+ true
+ case (_, _: NullValueInterval) | (_: NullValueInterval, _) =>
+ false
+ case (n1: NumericValueInterval, n2: NumericValueInterval) =>
+ n1.min.compareTo(n2.max) <= 0 && n1.max.compareTo(n2.min) >= 0
+ }
+
+ /**
+ * Intersected results of two intervals. This is only for two overlapped intervals.
+ * The outputs are the intersected min/max values.
+ */
+ def intersect(r1: ValueInterval, r2: ValueInterval, dt: DataType): (Option[Any], Option[Any]) = {
+ (r1, r2) match {
+ case (_, _: DefaultValueInterval) | (_: DefaultValueInterval, _) =>
+ // binary/string types don't support intersecting.
+ (None, None)
+ case (n1: NumericValueInterval, n2: NumericValueInterval) =>
+ // Choose the maximum of two min values, and the minimum of two max values.
+ val newMin = if (n1.min <= n2.min) n2.min else n1.min
+ val newMax = if (n1.max <= n2.max) n1.max else n2.max
+ (Some(EstimationUtils.fromDecimal(newMin, dt)),
+ Some(EstimationUtils.fromDecimal(newMax, dt)))
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org