You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2016/02/16 19:55:05 UTC
spark git commit: [SPARK-12976][SQL] Add LazilyGenerateOrdering and
use it for RangePartitioner of Exchange.
Repository: spark
Updated Branches:
refs/heads/master 00c72d27b -> 19dc69de7
[SPARK-12976][SQL] Add LazilyGenerateOrdering and use it for RangePartitioner of Exchange.
Add `LazilyGenerateOrdering` to support generated ordering for `RangePartitioner` of `Exchange` instead of `InterpretedOrdering`.
Author: Takuya UESHIN <ue...@happy-camper.st>
Closes #10894 from ueshin/issues/SPARK-12976.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/19dc69de
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/19dc69de
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/19dc69de
Branch: refs/heads/master
Commit: 19dc69de795eb08f3bab4988ad88732bf8ca7bae
Parents: 00c72d2
Author: Takuya UESHIN <ue...@happy-camper.st>
Authored: Tue Feb 16 10:54:44 2016 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Feb 16 10:54:44 2016 -0800
----------------------------------------------------------------------
.../expressions/codegen/GenerateOrdering.scala | 37 ++++++++++++++++++++
.../apache/spark/sql/execution/Exchange.scala | 6 ++--
.../org/apache/spark/sql/execution/limit.scala | 7 ++--
3 files changed, 42 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
index 6de5753..5756f20 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
@@ -17,10 +17,13 @@
package org.apache.spark.sql.catalyst.expressions.codegen
+import java.io.ObjectInputStream
+
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.Utils
/**
* Inherits some default implementation for Java from `Ordering[Row]`
@@ -138,3 +141,37 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[BaseOrdering]
}
}
+
+/**
+ * A lazily generated row ordering comparator.
+ */
+class LazilyGeneratedOrdering(val ordering: Seq[SortOrder]) extends Ordering[InternalRow] {
+
+ def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
+ this(ordering.map(BindReferences.bindReference(_, inputSchema)))
+
+ @transient
+ private[this] var generatedOrdering = GenerateOrdering.generate(ordering)
+
+ def compare(a: InternalRow, b: InternalRow): Int = {
+ generatedOrdering.compare(a, b)
+ }
+
+ private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
+ in.defaultReadObject()
+ generatedOrdering = GenerateOrdering.generate(ordering)
+ }
+}
+
+object LazilyGeneratedOrdering {
+
+ /**
+ * Creates a [[LazilyGeneratedOrdering]] for the given schema, in natural ascending order.
+ */
+ def forSchema(schema: StructType): LazilyGeneratedOrdering = {
+ new LazilyGeneratedOrdering(schema.zipWithIndex.map {
+ case (field, ordinal) =>
+ SortOrder(BoundReference(ordinal, field.dataType, nullable = true), Ascending)
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 97f65f1..e30adef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
@@ -206,10 +207,7 @@ object Exchange {
val mutablePair = new MutablePair[InternalRow, Null]()
iter.map(row => mutablePair.update(row.copy(), null))
}
- // We need to use an interpreted ordering here because generated orderings cannot be
- // serialized and this ordering needs to be created on the driver in order to be passed into
- // Spark core code.
- implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes)
+ implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
case SinglePartition =>
new Partitioner {
http://git-wip-us.apache.org/repos/asf/spark/blob/19dc69de/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 04daf9d..ef76847 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
@@ -88,11 +89,8 @@ case class TakeOrderedAndProject(
override def outputPartitioning: Partitioning = SinglePartition
- // We need to use an interpreted ordering here because generated orderings cannot be serialized
- // and this ordering needs to be created on the driver in order to be passed into Spark core code.
- private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output)
-
override def executeCollect(): Array[InternalRow] = {
+ val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
if (projectList.isDefined) {
val proj = UnsafeProjection.create(projectList.get, child.output)
@@ -105,6 +103,7 @@ case class TakeOrderedAndProject(
private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
protected override def doExecute(): RDD[InternalRow] = {
+ val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
val localTopK: RDD[InternalRow] = {
child.execute().map(_.copy()).mapPartitions { iter =>
org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org