You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2022/07/25 03:45:18 UTC

[spark] branch branch-3.3 updated: [SPARK-39856][SQL][TESTS] Increase the number of partitions in TPC-DS build to avoid out-of-memory

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new c7e2604b098 [SPARK-39856][SQL][TESTS] Increase the number of partitions in TPC-DS build to avoid out-of-memory
c7e2604b098 is described below

commit c7e2604b098d153b98825db6e049e3e1a515a148
Author: Hyukjin Kwon <gu...@apache.org>
AuthorDate: Mon Jul 25 12:44:54 2022 +0900

    [SPARK-39856][SQL][TESTS] Increase the number of partitions in TPC-DS build to avoid out-of-memory
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to avoid out-of-memory in TPC-DS build at GitHub Actions CI by:
    
    - Increasing the number of partitions being used in shuffle.
    - Truncating precisions after 10th in floats.
        The number of partitions was previously set to 1 because of different results in precisions that generally we can just ignore.
    - Sort the results regardless of join type since Apache Spark does not guarantee the order of results
    
    ### Why are the changes needed?
    
    One of the reasons for the large memory usage seems to be single partition that's being used in the shuffle.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, test-only.
    
    ### How was this patch tested?
    
    GitHub Actions in this CI will test it out.
    
    Closes #37270 from HyukjinKwon/deflake-tpcds.
    
    Authored-by: Hyukjin Kwon <gu...@apache.org>
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
    (cherry picked from commit 7358253755762f9bfe6cedc1a50ec14616cfeace)
    Signed-off-by: Hyukjin Kwon <gu...@apache.org>
---
 .../org/apache/spark/sql/TPCDSQueryTestSuite.scala | 23 ++++++++++------------
 1 file changed, 10 insertions(+), 13 deletions(-)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
index 8019fc98a52..92cf574781f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TPCDSQueryTestSuite.scala
@@ -62,7 +62,7 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
 
   // To make output results deterministic
   override protected def sparkConf: SparkConf = super.sparkConf
-    .set(SQLConf.SHUFFLE_PARTITIONS.key, "1")
+    .set(SQLConf.SHUFFLE_PARTITIONS.key, 4.toString)
 
   protected override def createSparkSession: TestSparkSession = {
     new TestSparkSession(new SparkContext("local[1]", this.getClass.getSimpleName, sparkConf))
@@ -105,7 +105,6 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
       query: String,
       goldenFile: File,
       conf: Map[String, String]): Unit = {
-    val shouldSortResults = sortMergeJoinConf != conf  // Sort for other joins
     withSQLConf(conf.toSeq: _*) {
       try {
         val (schema, output) = handleExceptions(getNormalizedResult(spark, query))
@@ -143,17 +142,15 @@ class TPCDSQueryTestSuite extends QueryTest with TPCDSBase with SQLQueryTestHelp
         assertResult(expectedSchema, s"Schema did not match\n$queryString") {
           schema
         }
-        if (shouldSortResults) {
-          val expectSorted = expectedOutput.split("\n").sorted.map(_.trim)
-            .mkString("\n").replaceAll("\\s+$", "")
-          val outputSorted = output.sorted.map(_.trim).mkString("\n").replaceAll("\\s+$", "")
-          assertResult(expectSorted, s"Result did not match\n$queryString") {
-            outputSorted
-          }
-        } else {
-          assertResult(expectedOutput, s"Result did not match\n$queryString") {
-            outputString
-          }
+        // Truncate precisions because they can be vary per how the shuffle is performed.
+        val expectSorted = expectedOutput.split("\n").sorted.map(_.trim)
+          .mkString("\n").replaceAll("\\s+$", "")
+          .replaceAll("""([0-9]+.[0-9]{10})([0-9]*)""", "$1")
+        val outputSorted = output.sorted.map(_.trim).mkString("\n")
+          .replaceAll("\\s+$", "")
+          .replaceAll("""([0-9]+.[0-9]{10})([0-9]*)""", "$1")
+        assertResult(expectSorted, s"Result did not match\n$queryString") {
+          outputSorted
         }
       } catch {
         case e: Throwable =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org