You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/28 04:42:13 UTC

git commit: [SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding

Repository: spark
Updated Branches:
  refs/heads/master 7e3a1ada8 -> 418ad83fe


[SPARK-3911] [SQL] HiveSimpleUdf can not be optimized in constant folding

```
explain extended select cos(null) from src limit 1;
```
outputs:
```
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  MetastoreRelation default, src, None

== Optimized Logical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  MetastoreRelation default, src, None

== Physical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#5]
  HiveTableScan [], (MetastoreRelation default, src, None), None
```
After patching this PR it outputs
```
== Parsed Logical Plan ==
Limit 1
 Project ['cos(null) AS c_0#0]
  UnresolvedRelation None, src, None

== Analyzed Logical Plan ==
Limit 1
 Project [HiveSimpleUdf#org.apache.hadoop.hive.ql.udf.UDFCos(null) AS c_0#0]
  MetastoreRelation default, src, None

== Optimized Logical Plan ==
Limit 1
 Project [null AS c_0#0]
  MetastoreRelation default, src, None

== Physical Plan ==
Limit 1
 Project [null AS c_0#0]
  HiveTableScan [], (MetastoreRelation default, src, None), None
```

Author: Cheng Hao <ha...@intel.com>

Closes #2771 from chenghao-intel/hive_udf_constant_folding and squashes the following commits:

1379c73 [Cheng Hao] duplicate the PlanTest with catalyst/plans/PlanTest
1e52dda [Cheng Hao] add unit test for hive simple udf constant folding
01609ff [Cheng Hao] support constant folding for HiveSimpleUdf


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/418ad83f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/418ad83f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/418ad83f

Branch: refs/heads/master
Commit: 418ad83fe113f2f90552eb7247670279b55aed28
Parents: 7e3a1ad
Author: Cheng Hao <ha...@intel.com>
Authored: Mon Oct 27 20:42:05 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Oct 27 20:42:05 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/plans/PlanTest.scala     |  3 +-
 .../org/apache/spark/sql/hive/hiveUdfs.scala    | 10 ++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  4 +-
 .../spark/sql/catalyst/plans/PlanTest.scala     | 57 ++++++++++++++++++++
 .../spark/sql/hive/execution/HivePlanTest.scala | 32 +++++++++++
 5 files changed, 104 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
index 7e9f47e..c4a1f89 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -33,7 +33,8 @@ class PlanTest extends FunSuite {
    * we must normalize them to check if two different queries are identical.
    */
   protected def normalizeExprIds(plan: LogicalPlan) = {
-    val minId = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id)).min
+    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+    val minId = if (list.isEmpty) 0 else list.min
     plan transformAllExpressions {
       case a: AttributeReference =>
         AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))

http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 68f93f2..683c820 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -99,6 +99,16 @@ private[hive] case class HiveSimpleUdf(functionClassName: String, children: Seq[
   @transient
   protected lazy val arguments = children.map(c => toInspector(c.dataType)).toArray
 
+  @transient
+  protected lazy val isUDFDeterministic = {
+    val udfType = function.getClass().getAnnotation(classOf[HiveUDFType])
+    udfType != null && udfType.deterministic()
+  }
+
+  override def foldable = {
+    isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
+  }
+
   // Create parameter converters
   @transient
   protected lazy val conversionHelper = new ConversionHelper(method, arguments)

http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 95921c3..6b06410 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.sql.catalyst.expressions.{ExprId, AttributeReference}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 
@@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.util._
  * It is hard to have maven allow one subproject depend on another subprojects test code.
  * So, we duplicate this code here.
  */
-class QueryTest extends FunSuite {
+class QueryTest extends PlanTest {
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param rdd the [[SchemaRDD]] to be executed

http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
new file mode 100644
index 0000000..081d94b
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, ExprId}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util._
+import org.scalatest.FunSuite
+
+/**
+ * *** DUPLICATED FROM sql/catalyst/plans. ***
+ *
+ * It is hard to have maven allow one subproject depend on another subprojects test code.
+ * So, we duplicate this code here.
+ */
+class PlanTest extends FunSuite {
+
+  /**
+   * Since attribute references are given globally unique ids during analysis,
+   * we must normalize them to check if two different queries are identical.
+   */
+  protected def normalizeExprIds(plan: LogicalPlan) = {
+    val list = plan.flatMap(_.expressions.flatMap(_.references).map(_.exprId.id))
+    val minId = if (list.isEmpty) 0 else list.min
+    plan transformAllExpressions {
+      case a: AttributeReference =>
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(a.exprId.id - minId))
+    }
+  }
+
+  /** Fails the test if the two plans do not match */
+  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+    val normalized1 = normalizeExprIds(plan1)
+    val normalized2 = normalizeExprIds(plan2)
+    if (normalized1 != normalized2)
+      fail(
+        s"""
+          |== FAIL: Plans do not match ===
+          |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+        """.stripMargin)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/418ad83f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
new file mode 100644
index 0000000..c939e6e
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.hive.test.TestHive
+
+class HivePlanTest extends QueryTest {
+  import TestHive._
+
+  test("udf constant folding") {
+    val optimized = sql("SELECT cos(null) FROM src").queryExecution.optimizedPlan
+    val correctAnswer = sql("SELECT cast(null as double) FROM src").queryExecution.optimizedPlan
+
+    comparePlans(optimized, correctAnswer)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org