You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/03 20:25:37 UTC

git commit: [SPARK-2693][SQL] Supported for UDAF Hive Aggregates like PERCENTILE

Repository: spark
Updated Branches:
  refs/heads/master 9d320e222 -> 22f8e1ee7


[SPARK-2693][SQL] Supported for UDAF Hive Aggregates like PERCENTILE

Implemented UDAF Hive aggregates by adding wrapper to Spark Hive.

Author: ravipesala <ra...@huawei.com>

Closes #2620 from ravipesala/SPARK-2693 and squashes the following commits:

a8df326 [ravipesala] Removed resolver from constructor arguments
caf25c6 [ravipesala] Fixed style issues
5786200 [ravipesala] Supported for UDAF Hive Aggregates like PERCENTILE


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22f8e1ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22f8e1ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22f8e1ee

Branch: refs/heads/master
Commit: 22f8e1ee7c4ea7b3bd4c6faaf0fe5b88a134ae12
Parents: 9d320e2
Author: ravipesala <ra...@huawei.com>
Authored: Fri Oct 3 11:25:18 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 3 11:25:18 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/hiveUdfs.scala    | 46 ++++++++++++++++++--
 .../spark/sql/hive/execution/HiveUdfSuite.scala |  4 ++
 2 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22f8e1ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 732e497..68f93f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.hadoop.hive.ql.exec.{UDF, UDAF}
 import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
 import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
 import org.apache.hadoop.hive.ql.udf.generic._
@@ -57,7 +57,8 @@ private[hive] abstract class HiveFunctionRegistry
     } else if (
          classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
       HiveGenericUdaf(functionClassName, children)
-
+    } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
+      HiveUdaf(functionClassName, children)
     } else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
       HiveGenericUdtf(functionClassName, Nil, children)
     } else {
@@ -194,6 +195,37 @@ private[hive] case class HiveGenericUdaf(
   def newInstance() = new HiveUdafFunction(functionClassName, children, this)
 }
 
+/** It is used as a wrapper for the hive functions which uses UDAF interface */
+private[hive] case class HiveUdaf(
+    functionClassName: String,
+    children: Seq[Expression]) extends AggregateExpression
+  with HiveInspectors
+  with HiveFunctionFactory {
+
+  type UDFType = UDAF
+
+  @transient
+  protected lazy val resolver: AbstractGenericUDAFResolver = new GenericUDAFBridge(createFunction())
+
+  @transient
+  protected lazy val objectInspector  = {
+    resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
+      .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
+  }
+
+  @transient
+  protected lazy val inspectors = children.map(_.dataType).map(toInspector)
+
+  def dataType: DataType = inspectorToDataType(objectInspector)
+
+  def nullable: Boolean = true
+
+  override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})"
+
+  def newInstance() =
+    new HiveUdafFunction(functionClassName, children, this, true)
+}
+
 /**
  * Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
  * [[catalyst.expressions.Generator Generator]].  Note that the semantics of Generators do not allow
@@ -275,14 +307,20 @@ private[hive] case class HiveGenericUdtf(
 private[hive] case class HiveUdafFunction(
     functionClassName: String,
     exprs: Seq[Expression],
-    base: AggregateExpression)
+    base: AggregateExpression,
+    isUDAFBridgeRequired: Boolean = false)
   extends AggregateFunction
   with HiveInspectors
   with HiveFunctionFactory {
 
   def this() = this(null, null, null)
 
-  private val resolver = createFunction[AbstractGenericUDAFResolver]()
+  private val resolver =
+    if (isUDAFBridgeRequired) {
+      new GenericUDAFBridge(createFunction[UDAF]())
+    } else {
+      createFunction[AbstractGenericUDAFResolver]()
+    }
 
   private val inspectors = exprs.map(_.dataType).map(toInspector).toArray
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22f8e1ee/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index cc125d5..e4324e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -79,6 +79,10 @@ class HiveUdfSuite extends HiveComparisonTest {
     sql("SELECT testUdf(pair) FROM hiveUdfTestTable")
     sql("DROP TEMPORARY FUNCTION IF EXISTS testUdf")
   }
+
+  test("SPARK-2693 udaf aggregates test") {
+    assert(sql("SELECT percentile(key,1) FROM src").first === sql("SELECT max(key) FROM src").first)
+  }
 }
 
 class TestPair(x: Int, y: Int) extends Writable with Serializable {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org