You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/02/16 19:55:05 UTC

spark git commit: [SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.

Repository: spark
Updated Branches:
  refs/heads/master 00c72d27b -> 19dc69de7


[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.

Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`.

Author: Takuya UESHIN <ue...@happy-camper.st>

Closes #10894 from ueshin/issues/SPARK-12976.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19dc69de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19dc69de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19dc69de

Branch: refs/heads/master
Commit: 19dc69de795eb08f3bab4988ad88732bf8ca7bae
Parents: 00c72d2
Author: Takuya UESHIN <ue...@happy-camper.st>
Authored: Tue Feb 16 10:54:44 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Feb 16 10:54:44 2016 -0800

----------------------------------------------------------------------
 .../expressions/codegen/GenerateOrdering.scala  | 37 ++++++++++++++++++++
 .../apache/spark/sql/execution/Exchange.scala   |  6 ++--
 .../org/apache/spark/sql/execution/limit.scala  |  7 ++--
 3 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 6de5753..5756f20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.catalyst.expressions.codegen
 
+import java.io.ObjectInputStream
+
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
 
 /**
  * Inherits some default implementation for Java from `Ordering[Row]`
@@ -138,3 +141,37 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
     CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
   }
 }
+
+/**
+ * A lazily generated row ordering comparator.
+ */
+class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+
+  def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+    this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
+  @transient
+  private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
+
+  def compare(a: InternalRow, b: InternalRow): Int = {
+    generatedOrdering.compare(a, b)
+  }
+
+  private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+    in.defaultReadObject()
+    generatedOrdering = GenerateOrdering.generate(ordering)
+  }
+}
+
+object LazilyGeneratedOrdering {
+
+  /**
+   * Creates a [[LazilyGeneratedOrdering]] for the given schema, in natural ascending order.
+   */
+  def forSchema(schema: StructType): LazilyGeneratedOrdering = {
+    new LazilyGeneratedOrdering(schema.zipWithIndex.map {
+      case (field, ordinal) =>
+        SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 97f65f1..e30adef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.errors.attachTree
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.util.MutablePair
@@ -206,10 +207,7 @@ object Exchange {
           val mutablePair = new MutablePair[InternalRow, Null]()
           iter.map(row => mutablePair.update(row.copy(), null))
         }
-        // We need to use an interpreted ordering here because generated orderings cannot be
-        // serialized and this ordering needs to be created on the driver in order to be passed into
-        // Spark core code.
-        implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes)
+        implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
         new RangePartitioner(numPartitions, rddForSampling, ascending = true)
       case SinglePartition =>
         new Partitioner {

http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 04daf9d..ef76847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 
 
@@ -88,11 +89,8 @@ case class TakeOrderedAndProject(
 
   override def outputPartitioning: Partitioning = SinglePartition
 
-  // We need to use an interpreted ordering here because generated orderings cannot be serialized
-  // and this ordering needs to be created on the driver in order to be passed into Spark core code.
-  private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
-
   override def executeCollect(): Array[InternalRow] = {
+    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
     if (projectList.isDefined) {
       val proj = UnsafeProjection.create(projectList.get, child.output)
@@ -105,6 +103,7 @@ case class TakeOrderedAndProject(
   private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
 
   protected override def doExecute(): RDD[InternalRow] = {
+    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
     val localTopK: RDD[InternalRow] = {
       child.execute().map(_.copy()).mapPartitions { iter =>
         org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org