You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/03 20:25:37 UTC
git commit: [SPARK-2693][SQL] Supported for UDAF Hive Aggregates like
PERCENTILE
Repository: spark
Updated Branches:
refs/heads/master 9d320e222 -> 22f8e1ee7
[SPARK-2693][SQL] Supported for UDAF Hive Aggregates like PERCENTILE
Implemented UDAF Hive aggregates by adding wrapper to Spark Hive.
Author: ravipesala <ra...@huawei.com>
Closes #2620 from ravipesala/SPARK-2693 and squashes the following commits:
a8df326 [ravipesala] Removed resolver from constructor arguments
caf25c6 [ravipesala] Fixed style issues
5786200 [ravipesala] Supported for UDAF Hive Aggregates like PERCENTILE
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22f8e1ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22f8e1ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22f8e1ee
Branch: refs/heads/master
Commit: 22f8e1ee7c4ea7b3bd4c6faaf0fe5b88a134ae12
Parents: 9d320e2
Author: ravipesala <ra...@huawei.com>
Authored: Fri Oct 3 11:25:18 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Fri Oct 3 11:25:18 2014 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/hive/hiveUdfs.scala | 46 ++++++++++++++++++--
.../spark/sql/hive/execution/HiveUdfSuite.scala | 4 ++
2 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/22f8e1ee/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index 732e497..68f93f2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
import scala.collection.mutable.ArrayBuffer
import org.apache.hadoop.hive.common.`type`.HiveDecimal
-import org.apache.hadoop.hive.ql.exec.UDF
+import org.apache.hadoop.hive.ql.exec.{UDF, UDAF}
import org.apache.hadoop.hive.ql.exec.{FunctionInfo, FunctionRegistry}
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
import org.apache.hadoop.hive.ql.udf.generic._
@@ -57,7 +57,8 @@ private[hive] abstract class HiveFunctionRegistry
} else if (
classOf[AbstractGenericUDAFResolver].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUdaf(functionClassName, children)
-
+ } else if (classOf[UDAF].isAssignableFrom(functionInfo.getFunctionClass)) {
+ HiveUdaf(functionClassName, children)
} else if (classOf[GenericUDTF].isAssignableFrom(functionInfo.getFunctionClass)) {
HiveGenericUdtf(functionClassName, Nil, children)
} else {
@@ -194,6 +195,37 @@ private[hive] case class HiveGenericUdaf(
def newInstance() = new HiveUdafFunction(functionClassName, children, this)
}
+/** It is used as a wrapper for the hive functions which uses UDAF interface */
+private[hive] case class HiveUdaf(
+ functionClassName: String,
+ children: Seq[Expression]) extends AggregateExpression
+ with HiveInspectors
+ with HiveFunctionFactory {
+
+ type UDFType = UDAF
+
+ @transient
+ protected lazy val resolver: AbstractGenericUDAFResolver = new GenericUDAFBridge(createFunction())
+
+ @transient
+ protected lazy val objectInspector = {
+ resolver.getEvaluator(children.map(_.dataType.toTypeInfo).toArray)
+ .init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors.toArray)
+ }
+
+ @transient
+ protected lazy val inspectors = children.map(_.dataType).map(toInspector)
+
+ def dataType: DataType = inspectorToDataType(objectInspector)
+
+ def nullable: Boolean = true
+
+ override def toString = s"$nodeName#$functionClassName(${children.mkString(",")})"
+
+ def newInstance() =
+ new HiveUdafFunction(functionClassName, children, this, true)
+}
+
/**
* Converts a Hive Generic User Defined Table Generating Function (UDTF) to a
* [[catalyst.expressions.Generator Generator]]. Note that the semantics of Generators do not allow
@@ -275,14 +307,20 @@ private[hive] case class HiveGenericUdtf(
private[hive] case class HiveUdafFunction(
functionClassName: String,
exprs: Seq[Expression],
- base: AggregateExpression)
+ base: AggregateExpression,
+ isUDAFBridgeRequired: Boolean = false)
extends AggregateFunction
with HiveInspectors
with HiveFunctionFactory {
def this() = this(null, null, null)
- private val resolver = createFunction[AbstractGenericUDAFResolver]()
+ private val resolver =
+ if (isUDAFBridgeRequired) {
+ new GenericUDAFBridge(createFunction[UDAF]())
+ } else {
+ createFunction[AbstractGenericUDAFResolver]()
+ }
private val inspectors = exprs.map(_.dataType).map(toInspector).toArray
http://git-wip-us.apache.org/repos/asf/spark/blob/22f8e1ee/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
index cc125d5..e4324e9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUdfSuite.scala
@@ -79,6 +79,10 @@ class HiveUdfSuite extends HiveComparisonTest {
sql("SELECT testUdf(pair) FROM hiveUdfTestTable")
sql("DROP TEMPORARY FUNCTION IF EXISTS testUdf")
}
+
+ test("SPARK-2693 udaf aggregates test") {
+ assert(sql("SELECT percentile(key,1) FROM src").first === sql("SELECT max(key) FROM src").first)
+ }
}
class TestPair(x: Int, y: Int) extends Writable with Serializable {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org