You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/12/28 21:23:31 UTC

spark git commit: [SPARK-12287][SQL] Support UnsafeRow in MapPartitions/MapGroups/CoGroup

Repository: spark
Updated Branches:
  refs/heads/master 73b70f076 -> e01c6c866


[SPARK-12287][SQL] Support UnsafeRow in MapPartitions/MapGroups/CoGroup

Support Unsafe Row in MapPartitions/MapGroups/CoGroup.

Added a test case for MapPartitions. Since MapGroups and CoGroup are built on AppendColumns, all the related dataset test cases already can verify the correctness when MapGroups and CoGroup processing unsafe rows.

davies cloud-fan Not sure if my understanding is right, please correct me. Thank you!

Author: gatorsmile <ga...@gmail.com>

Closes #10398 from gatorsmile/unsafeRowMapGroup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e01c6c86
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e01c6c86
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e01c6c86

Branch: refs/heads/master
Commit: e01c6c8664d74d434e9b6b3c8c70570f01d4a0a4
Parents: 73b70f0
Author: gatorsmile <ga...@gmail.com>
Authored: Mon Dec 28 12:23:28 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Dec 28 12:23:28 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/execution/basicOperators.scala    | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e01c6c86/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
index 21325be..6b7b3bb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala
@@ -370,6 +370,10 @@ case class MapPartitions[T, U](
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryNode {
 
+  override def canProcessSafeRows: Boolean = true
+  override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
+
   override protected def doExecute(): RDD[InternalRow] = {
     child.execute().mapPartitionsInternal { iter =>
       val tBoundEncoder = tEncoder.bind(child.output)
@@ -391,6 +395,7 @@ case class AppendColumns[T, U](
   // We are using an unsafe combiner.
   override def canProcessSafeRows: Boolean = false
   override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
 
   override def output: Seq[Attribute] = child.output ++ newColumns
 
@@ -420,6 +425,10 @@ case class MapGroups[K, T, U](
     output: Seq[Attribute],
     child: SparkPlan) extends UnaryNode {
 
+  override def canProcessSafeRows: Boolean = true
+  override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
+
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(groupingAttributes) :: Nil
 
@@ -459,6 +468,10 @@ case class CoGroup[Key, Left, Right, Result](
     left: SparkPlan,
     right: SparkPlan) extends BinaryNode {
 
+  override def canProcessSafeRows: Boolean = true
+  override def canProcessUnsafeRows: Boolean = true
+  override def outputsUnsafeRows: Boolean = true
+
   override def requiredChildDistribution: Seq[Distribution] =
     ClusteredDistribution(leftGroup) :: ClusteredDistribution(rightGroup) :: Nil
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org