You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/11/18 07:23:26 UTC

[GitHub] [spark] xuzikun2003 commented on a change in pull request #29725: [SPARK-32096][SQL] Improve sorting performance of Spark SQL window function by removing window partition key from sort order

xuzikun2003 commented on a change in pull request #29725:
URL: https://github.com/apache/spark/pull/29725#discussion_r525862140



##########
File path: sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowWindowSorter.java
##########
@@ -0,0 +1,453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.io.IOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.collection.Iterator;
+import scala.math.Ordering;
+
+import org.apache.spark.memory.SparkOutOfMemoryError;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.collection.unsafe.sort.PrefixComparator;
+import org.apache.spark.util.collection.unsafe.sort.RecordComparator;
+
+public final class UnsafeExternalRowWindowSorter extends AbstractUnsafeExternalRowSorter {
+
+  private static final Logger logger = LoggerFactory.getLogger(UnsafeExternalRowWindowSorter.class);
+
+  private final StructType schema;
+  private final UnsafeProjection partitionSpecProjection;
+  private final Ordering<InternalRow> orderingOfPartitionKey;
+  private final Ordering<InternalRow> orderingInWindow;
+  private final Ordering<InternalRow> orderingAcrossWindows;
+  private final PrefixComparator prefixComparatorInWindow;
+  private final UnsafeExternalRowSorter.PrefixComputer prefixComputerInWindow;
+  private final boolean canUseRadixSortInWindow;
+  private final long pageSizeBytes;
+  private static final int windowSorterMapMaxSize = 1;

Review comment:
       @maropu , @opensky142857, here are the reasons for why we set the windowSorterMapMaxSize to be 1 and why we reduce the page size of each sorter.
   
   Each UnsafeExternalRowSorter is using a different memory consumer. Whenever you insert the first row into an UnsafeExternalRowSorter, the memory consumer of this sorter will allocate a whole page to the sorter. In our perf run of TPCDS100TB, the default page size is 64MB. If we insert only a few rows into a sorter corresponding to a window, then a lot of memory resources are wasted and it also brings significant overhead for non-necessary memory allocation. So that is why we do two things in this PR:
   1. Keep the number of window sorters small
   2. Decrease the page size of each window sorter.
   
   To address this problem, actually we have two directions to go. 
   
   One direction is that we can let these window sorters share the same memory consumer. Thus we won't allocate many big pages to which very few rows are inserted.
   
   The second direction is that we only keep one window sorter for each physical partition.
   
   Here is why we choose the second direction. When we run TPCDS100TB, we are not seeing Spark engine is slow in sorting many windows in a physical partition. We are seeing Spark engine is slow in sorting a single window in a single physical partition (q67 is the case), and the executor is doing a lot of unnecessary comparisons on the window partition key. To address the slowness that we observe, we follow the second direction to keep only one window sorter for each physical partition. And this single window sorter in each physical partition does not need to compare the window partition key.
   
   Perhaps I can rename these parameters to avoid confusion. How do you guys think?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {

Review comment:
       Sure, will add.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
##########
@@ -124,7 +125,18 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
       if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) {
         child
       } else {
-        SortExec(requiredOrdering, global = false, child = child)
+        operator match {
+          case WindowExec(_, partitionSpec, orderSpec, _)
+            if (!partitionSpec.isEmpty && !orderSpec.isEmpty) =>

Review comment:
       Thanks, will fix it.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
 import org.apache.spark.sql.execution.metric.SQLMetrics

Review comment:
       Both WindowSortExec and SortExec are for sorting execution. So I want to keep the word suffix Exec to tell this is for a query execution.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -55,53 +189,64 @@ case class SortExec(
   override def requiredChildDistribution: Seq[Distribution] =
     if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil
 
-  private val enableRadixSort = sqlContext.conf.enableRadixSort
+  val enableRadixSort = sqlContext.conf.enableRadixSort
+
+  lazy val boundSortExpression = BindReferences.bindReference(sortOrder.head, output)
+  lazy val ordering = RowOrdering.create(sortOrder, output)
+  lazy val sortPrefixExpr = SortPrefix(boundSortExpression)
+
+  // The comparator for comparing prefix
+  lazy val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression)
+
+  // The generator for prefix
+  lazy val prefixComputer = createPrefixComputer(sortPrefixExpr)
+
+  lazy val canUseRadixSort = enableRadixSort && sortOrder.length == 1 &&
+    SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression)
+
+  lazy val pageSize = SparkEnv.get.memoryManager.pageSizeBytes

Review comment:
       Sure, I will add it.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows across
+ *                               different windows on a Spark physical partition.
+ *                               This sequence of sort orders is obtained from a partition
+ *                               key plus a sequence of sort orders inside a window
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,
+    child: SparkPlan,
+    testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrderAcrossWindows,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowWindowSorter = {
+    val partitionSpecGrouping = UnsafeProjection.create(partitionSpec, output)
+
+    // The schema of partition key
+    val partitionKeySchema: Seq[Attribute] = output.filter(x => {
+      x.references.subsetOf(AttributeSet(partitionSpec))
+    })
+
+    // Generate the ordering of partition key
+    val orderingOfPartitionKey = RowOrdering.create(
+      sortOrderAcrossWindows diff sortOrderInWindow,
+      partitionKeySchema)
+
+    // No prefix comparator
+    val nullPrefixComparator = new PrefixComparator {
+      override def compare(prefix1: Long, prefix2: Long): Int = 0
+    }
+
+    if (sortOrderInWindow == null || sortOrderInWindow.size == 0) {

Review comment:
       We don't run WindowSortExec when orderingInWindow == null or ortOrderInWindow.size == 0. We run the original SortExec if there is no need to sort within each group.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala
##########
@@ -42,6 +43,139 @@ case class SortExec(
     global: Boolean,
     child: SparkPlan,
     testSpillFrequency: Int = 0)
+  extends SortExecBase(
+    sortOrder,
+    global,
+    child,
+    testSpillFrequency) {
+
+  def createSorter(): UnsafeExternalRowSorter = {
+    rowSorter = UnsafeExternalRowSorter.create(
+      schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)
+
+    if (testSpillFrequency > 0) {
+      rowSorter.setTestSpillFrequency(testSpillFrequency)
+    }
+    rowSorter.asInstanceOf[UnsafeExternalRowSorter]
+  }
+
+  override protected def doProduce(ctx: CodegenContext): String = {
+    doProduce(ctx, classOf[UnsafeExternalRowSorter].getName)
+  }
+}
+
+/**
+ * Performs (external) sorting for multiple windows.
+ *
+ * @param partitionSpec a sequence of expressions that defines a partition key
+ * @param sortOrderInWindow a sequence of sort orders for sorting rows inside a window
+ * @param sortOrderAcrossWindows a sequence of sort orders for sorting rows across
+ *                               different windows on a Spark physical partition.
+ *                               This sequence of sort orders is obtained from a partition
+ *                               key plus a sequence of sort orders inside a window
+ * @param global when true performs a global sort of all partitions by shuffling the data first
+ *               if necessary.
+ * @param testSpillFrequency Method for configuring periodic spilling in unit tests. If set, will
+ *                           spill every `frequency` records.
+ */
+case class WindowSortExec(
+    partitionSpec: Seq[Expression],
+    sortOrderInWindow: Seq[SortOrder],
+    sortOrderAcrossWindows: Seq[SortOrder],
+    global: Boolean,

Review comment:
       You are right, we can remove this parameter.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org