You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/28 04:42:13 UTC
git commit: [SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in
constant folding
Repository: spark
Updated Branches:
refs/heads/master 7e3a1ada8 -> 418ad83fe
[SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding
```
explain extended select cos(null) from src limit 1;
```
outputs:
```
Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
MetastoreRelation default, src, None
== Optimized Logical Plan ==
Limit 1
Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
MetastoreRelation default, src, None
== Physical Plan ==
Limit 1
Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
HiveTableScan [], (MetastoreRelation default, src, None), None
```
After patching this PR it outputs
```
== Parsed Logical Plan ==
Limit 1
Project ['cos(null) AS c_0#0]
UnresolvedRelation None, src, None
== Analyzed Logical Plan ==
Limit 1
Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#0]
MetastoreRelation default, src, None
== Optimized Logical Plan ==
Limit 1
Project [null AS c_0#0]
MetastoreRelation default, src, None
== Physical Plan ==
Limit 1
Project [null AS c_0#0]
HiveTableScan [], (MetastoreRelation default, src, None), None
```
Author: Cheng Hao <ha...@intel.com>
Closes #2771 from chenghao-intel/hive_udf_constant_folding and squashes the following commits:
1379c73 [Cheng Hao] duplicate the PlanTest with catalyst/plans/PlanTest
1e52dda [Cheng Hao] add unit test for hive simple udf constant folding
01609ff [Cheng Hao] support constant folding for HiveSimpleUdf
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/418ad83f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/418ad83f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/418ad83f
Branch: refs/heads/master
Commit: 418ad83fe113f2f90552eb7247670279b55aed28
Parents: 7e3a1ad
Author: Cheng Hao <ha...@intel.com>
Authored: Mon Oct 27 20:42:05 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Oct 27 20:42:05 2014 -0700
----------------------------------------------------------------------
.../spark/sql/catalyst/plans/PlanTest.scala | 3 +-
.../org/apache/spark/sql/hive/hiveUdfs.scala | 10 ++++
.../scala/org/apache/spark/sql/QueryTest.scala | 4 +-
.../spark/sql/catalyst/plans/PlanTest.scala | 57 ++++++++++++++++++++
.../spark/sql/hive/execution/HivePlanTest.scala | 32 +++++++++++
5 files changed, 104 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 7e9f47e..c4a1f89 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,7 +33,8 @@ class PlanTest extends FunSuite {
* we must normalize them to check if two different queries are identical.
*/
protected def normalizeExprIds(plan: LogicalPlan) = {
- val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
+ val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+ val minId = if (list.isEmpty) 0 else list.min
plan transformAllExpressions {
case a: AttributeReference =>
AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 68f93f2..683c820 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -99,6 +99,16 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
@transient
protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
+ @transient
+ protected lazy val isUDFDeterministic = {
+ val udfType = function.getClass().getAnnotation(classOf[HiveUDFType])
+ udfType != null && udfType.deterministic()
+ }
+
+ override def foldable = {
+ isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
+ }
+
// Create parameter converters
@transient
protected lazy val conversionHelper = new ConversionHelper(method, arguments)
http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 95921c3..6b06410 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
* It is hard to have maven allow one subproject depend on another subprojects test code.
* So, we duplicate this code here.
*/
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
/**
* Runs the plan and makes sure the answer matches the expected result.
* @param rdd the [[SchemaRDD]] to be executed
http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
new file mode 100644
index 0000000..081d94b
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.scalatest.FunSuite
+
+/**
+ * *** DUPLICATED FROM sql/catalyst/plans. ***
+ *
+ * It is hard to have maven allow one subproject depend on another subprojects test code.
+ * So, we duplicate this code here.
+ */
+class PlanTest extends FunSuite {
+
+ /**
+ * Since attribute references are given globally unique ids during analysis,
+ * we must normalize them to check if two different queries are identical.
+ */
+ protected def normalizeExprIds(plan: LogicalPlan) = {
+ val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+ val minId = if (list.isEmpty) 0 else list.min
+ plan transformAllExpressions {
+ case a: AttributeReference =>
+ AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+ }
+ }
+
+ /** Fails the test if the two plans do not match */
+ protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+ val normalized1 = normalizeExprIds(plan1)
+ val normalized2 = normalizeExprIds(plan2)
+ if (normalized1 != normalized2)
+ fail(
+ s"""
+ |== FAIL: Plans do not match ===
+ |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+ """.stripMargin)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
new file mode 100644
index 0000000..c939e6e
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHive
+
+class HivePlanTest extends QueryTest {
+ import TestHive._
+
+ test("udf constant folding") {
+ val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
+ val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+
+ comparePlans(optimized, correctAnswer)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org