You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2017/01/09 05:15:55 UTC

spark git commit: [SPARK-17077][SQL] Cardinality estimation for project operator

Repository: spark
Updated Branches:
  refs/heads/master 19d9d4c85 -> 3ccabdfb4


[SPARK-17077][SQL] Cardinality estimation for project operator

## What changes were proposed in this pull request?

Support cardinality estimation for project operator.

## How was this patch tested?

Add a test suite and a base class in the catalyst package.

Author: Zhenhua Wang <wz...@163.com>

Closes #16430 from wzhfy/projectEstimation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3ccabdfb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3ccabdfb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3ccabdfb

Branch: refs/heads/master
Commit: 3ccabdfb4d760d684b1e0c0ed448a57331f209f2
Parents: 19d9d4c
Author: Zhenhua Wang <wz...@163.com>
Authored: Sun Jan 8 21:15:52 2017 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sun Jan 8 21:15:52 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/AttributeMap.scala |  2 +
 .../plans/logical/basicLogicalOperators.scala   |  4 ++
 .../statsEstimation/EstimationUtils.scala       | 54 ++++++++++++++++++++
 .../statsEstimation/ProjectEstimation.scala     | 44 ++++++++++++++++
 .../ProjectEstimationSuite.scala                | 51 ++++++++++++++++++
 .../StatsEstimationTestBase.scala               | 41 +++++++++++++++
 6 files changed, 196 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
index 96a11e3..1504a52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeMap.scala
@@ -33,6 +33,8 @@ class AttributeMap[A](baseMap: Map[ExprId, (Attribute, A)])
 
   override def get(k: Attribute): Option[A] = baseMap.get(k.exprId).map(_._2)
 
+  override def contains(k: Attribute): Boolean = get(k).isDefined
+
   override def + [B1 >: A](kv: (Attribute, B1)): Map[Attribute, B1] = baseMap.values.toMap + kv
 
   override def iterator: Iterator[(Attribute, A)] = baseMap.valuesIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c977e78..9b52a9c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.ProjectEstimation
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -53,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extend
 
   override def validConstraints: Set[Expression] =
     child.constraints.union(getAliasedConstraints(projectList))
+
+  override lazy val statistics: Statistics =
+    ProjectEstimation.estimate(this).getOrElse(super.statistics)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
new file mode 100644
index 0000000..f099e32
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan}
+import org.apache.spark.sql.types.StringType
+
+
+object EstimationUtils {
+
+  /** Check if each plan has rowCount in its statistics. */
+  def rowCountsExist(plans: LogicalPlan*): Boolean =
+    plans.forall(_.statistics.rowCount.isDefined)
+
+  /** Get column stats for output attributes. */
+  def getOutputMap(inputMap: AttributeMap[ColumnStat], output: Seq[Attribute])
+    : AttributeMap[ColumnStat] = {
+    AttributeMap(output.flatMap(a => inputMap.get(a).map(a -> _)))
+  }
+
+  def getRowSize(attributes: Seq[Attribute], attrStats: AttributeMap[ColumnStat]): Long = {
+    // We assign a generic overhead for a Row object, the actual overhead is different for different
+    // Row format.
+    8 + attributes.map { attr =>
+      if (attrStats.contains(attr)) {
+        attr.dataType match {
+          case StringType =>
+            // UTF8String: base + offset + numBytes
+            attrStats(attr).avgLen + 8 + 4
+          case _ =>
+            attrStats(attr).avgLen
+        }
+      } else {
+        attr.dataType.defaultSize
+      }
+    }.sum
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
new file mode 100644
index 0000000..6d63b09
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/ProjectEstimation.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.logical.{Project, Statistics}
+
+object ProjectEstimation {
+  import EstimationUtils._
+
+  def estimate(project: Project): Option[Statistics] = {
+    if (rowCountsExist(project.child)) {
+      val childStats = project.child.statistics
+      val inputAttrStats = childStats.attributeStats
+      // Match alias with its child's column stat
+      val aliasStats = project.expressions.collect {
+        case alias @ Alias(attr: Attribute, _) if inputAttrStats.contains(attr) =>
+          alias.toAttribute -> inputAttrStats(attr)
+      }
+      val outputAttrStats =
+        getOutputMap(AttributeMap(inputAttrStats.toSeq ++ aliasStats), project.output)
+      Some(childStats.copy(
+        sizeInBytes = childStats.rowCount.get * getRowSize(project.output, outputAttrStats),
+        attributeStats = outputAttrStats))
+    } else {
+      None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
new file mode 100644
index 0000000..4a1bed8
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils._
+import org.apache.spark.sql.types.IntegerType
+
+
+class ProjectEstimationSuite extends StatsEstimationTestBase {
+
+  test("estimate project with alias") {
+    val ar1 = AttributeReference("key1", IntegerType)()
+    val ar2 = AttributeReference("key2", IntegerType)()
+    val colStat1 = ColumnStat(2, Some(1), Some(2), 0, 4, 4)
+    val colStat2 = ColumnStat(1, Some(10), Some(10), 0, 4, 4)
+
+    val child = StatsTestPlan(
+      outputList = Seq(ar1, ar2),
+      stats = Statistics(
+        sizeInBytes = 2 * (4 + 4),
+        rowCount = Some(2),
+        attributeStats = AttributeMap(Seq(ar1 -> colStat1, ar2 -> colStat2))))
+
+    val project = Project(Seq(ar1, Alias(ar2, "abc")()), child)
+    val expectedColStats = Seq("key1" -> colStat1, "abc" -> colStat2)
+    val expectedAttrStats = toAttributeMap(expectedColStats, project)
+    // The number of rows won't change for project.
+    val expectedStats = Statistics(
+      sizeInBytes = 2 * getRowSize(project.output, expectedAttrStats),
+      rowCount = Some(2),
+      attributeStats = expectedAttrStats)
+    assert(project.statistics == expectedStats)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3ccabdfb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
new file mode 100644
index 0000000..fa5b290
--- /dev/null
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.statsEstimation
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LeafNode, LogicalPlan, Statistics}
+
+
+class StatsEstimationTestBase extends SparkFunSuite {
+
+  /** Convert (column name, column stat) pairs to an AttributeMap based on plan output. */
+  def toAttributeMap(colStats: Seq[(String, ColumnStat)], plan: LogicalPlan)
+    : AttributeMap[ColumnStat] = {
+    val nameToAttr: Map[String, Attribute] = plan.output.map(a => (a.name, a)).toMap
+    AttributeMap(colStats.map(kv => nameToAttr(kv._1) -> kv._2))
+  }
+}
+
+/**
+ * This class is used for unit-testing. It's a logical plan whose output and stats are passed in.
+ */
+protected case class StatsTestPlan(outputList: Seq[Attribute], stats: Statistics) extends LeafNode {
+  override def output: Seq[Attribute] = outputList
+  override lazy val statistics = stats
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org