You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/08/09 00:37:04 UTC

spark git commit: [SPARK-16898][SQL] Adds argument type information for typed logical plan like MapElements, TypedFilter, and AppendColumn

Repository: spark
Updated Branches:
  refs/heads/master df1065883 -> bca43cd63


[SPARK-16898][SQL] Adds argument type information for typed logical plan like MapElements, TypedFilter, and AppendColumn

## What changes were proposed in this pull request?

This PR adds argument type information for typed logical plan like MapElements, TypedFilter, and AppendColumn, so that we can use these info in customized optimizer rule.

## How was this patch tested?

Existing test.

Author: Sean Zhong <se...@databricks.com>

Closes #14494 from clockfly/add_more_info_for_typed_operator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bca43cd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bca43cd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bca43cd6

Branch: refs/heads/master
Commit: bca43cd63503eb5287151c5d9ca6ccd8cd13fbc8
Parents: df10658
Author: Sean Zhong <se...@databricks.com>
Authored: Tue Aug 9 08:36:50 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Aug 9 08:36:50 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 13 +++++++++----
 .../spark/sql/catalyst/plans/logical/object.scala  | 17 ++++++++++++++++-
 .../spark/sql/execution/SparkStrategies.scala      |  4 ++--
 .../sql/execution/aggregate/typedaggregators.scala |  8 ++++----
 4 files changed, 31 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bca43cd6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 7513000..e34a478 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -214,7 +214,7 @@ object EliminateSerialization extends Rule[LogicalPlan] {
       val objAttr = Alias(s.inputObjAttr, s.inputObjAttr.name)(exprId = d.outputObjAttr.exprId)
       Project(objAttr :: Nil, s.child)
 
-    case a @ AppendColumns(_, _, _, s: SerializeFromObject)
+    case a @ AppendColumns(_, _, _, _, _, s: SerializeFromObject)
         if a.deserializer.dataType == s.inputObjAttr.dataType =>
       AppendColumnsWithObject(a.func, s.serializer, a.serializer, s.child)
 
@@ -223,7 +223,7 @@ object EliminateSerialization extends Rule[LogicalPlan] {
     // deserialization in condition, and push it down through `SerializeFromObject`.
     // e.g. `ds.map(...).filter(...)` can be optimized by this rule to save extra deserialization,
     // but `ds.map(...).as[AnotherType].filter(...)` can not be optimized.
-    case f @ TypedFilter(_, _, s: SerializeFromObject)
+    case f @ TypedFilter(_, _, _, _, s: SerializeFromObject)
         if f.deserializer.dataType == s.inputObjAttr.dataType =>
       s.copy(child = f.withObjectProducerChild(s.child))
 
@@ -1703,9 +1703,14 @@ case class GetCurrentDatabase(sessionCatalog: SessionCatalog) extends Rule[Logic
  */
 object CombineTypedFilters extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case t1 @ TypedFilter(_, _, t2 @ TypedFilter(_, _, child))
+    case t1 @ TypedFilter(_, _, _, _, t2 @ TypedFilter(_, _, _, _, child))
         if t1.deserializer.dataType == t2.deserializer.dataType =>
-      TypedFilter(combineFilterFunction(t2.func, t1.func), t1.deserializer, child)
+      TypedFilter(
+        combineFilterFunction(t2.func, t1.func),
+        t1.argumentClass,
+        t1.argumentSchema,
+        t1.deserializer,
+        child)
   }
 
   private def combineFilterFunction(func1: AnyRef, func2: AnyRef): Any => Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/bca43cd6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index e1890ed..fefe5a3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -155,6 +155,8 @@ object MapElements {
     val deserialized = CatalystSerde.deserialize[T](child)
     val mapped = MapElements(
       func,
+      implicitly[Encoder[T]].clsTag.runtimeClass,
+      implicitly[Encoder[T]].schema,
       CatalystSerde.generateObjAttr[U],
       deserialized)
     CatalystSerde.serialize[U](mapped)
@@ -166,12 +168,19 @@ object MapElements {
  */
 case class MapElements(
     func: AnyRef,
+    argumentClass: Class[_],
+    argumentSchema: StructType,
     outputObjAttr: Attribute,
     child: LogicalPlan) extends ObjectConsumer with ObjectProducer
 
 object TypedFilter {
   def apply[T : Encoder](func: AnyRef, child: LogicalPlan): TypedFilter = {
-    TypedFilter(func, UnresolvedDeserializer(encoderFor[T].deserializer), child)
+    TypedFilter(
+      func,
+      implicitly[Encoder[T]].clsTag.runtimeClass,
+      implicitly[Encoder[T]].schema,
+      UnresolvedDeserializer(encoderFor[T].deserializer),
+      child)
   }
 }
 
@@ -186,6 +195,8 @@ object TypedFilter {
  */
 case class TypedFilter(
     func: AnyRef,
+    argumentClass: Class[_],
+    argumentSchema: StructType,
     deserializer: Expression,
     child: LogicalPlan) extends UnaryNode {
 
@@ -213,6 +224,8 @@ object AppendColumns {
       child: LogicalPlan): AppendColumns = {
     new AppendColumns(
       func.asInstanceOf[Any => Any],
+      implicitly[Encoder[T]].clsTag.runtimeClass,
+      implicitly[Encoder[T]].schema,
       UnresolvedDeserializer(encoderFor[T].deserializer),
       encoderFor[U].namedExpressions,
       child)
@@ -228,6 +241,8 @@ object AppendColumns {
  */
 case class AppendColumns(
     func: Any => Any,
+    argumentClass: Class[_],
+    argumentSchema: StructType,
     deserializer: Expression,
     serializer: Seq[NamedExpression],
     child: LogicalPlan) extends UnaryNode {

http://git-wip-us.apache.org/repos/asf/spark/blob/bca43cd6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index fb08e12..4dfec3e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -356,9 +356,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
         execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
           data, objAttr, planLater(child)) :: Nil
-      case logical.MapElements(f, objAttr, child) =>
+      case logical.MapElements(f, _, _, objAttr, child) =>
         execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
-      case logical.AppendColumns(f, in, out, child) =>
+      case logical.AppendColumns(f, _, _, in, out, child) =>
         execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
       case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
         execution.AppendColumnsWithObjectExec(f, childSer, newSer, planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/bca43cd6/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
index c39a78d..1dae5f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/typedaggregators.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.expressions.Aggregator
 ////////////////////////////////////////////////////////////////////////////////////////////////////
 
 
-class TypedSumDouble[IN](f: IN => Double) extends Aggregator[IN, Double, Double] {
+class TypedSumDouble[IN](val f: IN => Double) extends Aggregator[IN, Double, Double] {
   override def zero: Double = 0.0
   override def reduce(b: Double, a: IN): Double = b + f(a)
   override def merge(b1: Double, b2: Double): Double = b1 + b2
@@ -45,7 +45,7 @@ class TypedSumDouble[IN](f: IN => Double) extends Aggregator[IN, Double, Double]
 }
 
 
-class TypedSumLong[IN](f: IN => Long) extends Aggregator[IN, Long, Long] {
+class TypedSumLong[IN](val f: IN => Long) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0L
   override def reduce(b: Long, a: IN): Long = b + f(a)
   override def merge(b1: Long, b2: Long): Long = b1 + b2
@@ -63,7 +63,7 @@ class TypedSumLong[IN](f: IN => Long) extends Aggregator[IN, Long, Long] {
 }
 
 
-class TypedCount[IN](f: IN => Any) extends Aggregator[IN, Long, Long] {
+class TypedCount[IN](val f: IN => Any) extends Aggregator[IN, Long, Long] {
   override def zero: Long = 0
   override def reduce(b: Long, a: IN): Long = {
     if (f(a) == null) b else b + 1
@@ -82,7 +82,7 @@ class TypedCount[IN](f: IN => Any) extends Aggregator[IN, Long, Long] {
 }
 
 
-class TypedAverage[IN](f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
+class TypedAverage[IN](val f: IN => Double) extends Aggregator[IN, (Double, Long), Double] {
   override def zero: (Double, Long) = (0.0, 0L)
   override def reduce(b: (Double, Long), a: IN): (Double, Long) = (f(a) + b._1, 1 + b._2)
   override def finish(reduction: (Double, Long)): Double = reduction._1 / reduction._2


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org