You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/29 00:33:49 UTC

[spark] branch branch-3.1 updated: [SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 403a4c2  [SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage
403a4c2 is described below

commit 403a4c2859e778cb56612adaf3169202f1541bd3
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Dec 28 16:33:01 2020 -0800

    [SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage
    
    ### What changes were proposed in this pull request?
    
    This PR aims to fix offset bug and improve compression codec test coverage.
    
    ### Why are the changes needed?
    
    When the user choose a non-default codec, it causes a failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the extended test suite.
    
    Closes #30934 from dongjoon-hyun/SPARK-33916.
    
    Authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit 6497ccbbda1874187ee60a4f6368e6d9ae6580ff)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/storage/FallbackStorage.scala |  2 +-
 .../spark/storage/FallbackStorageSuite.scala       | 67 +++++++++++-----------
 2 files changed, 36 insertions(+), 33 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 9221731..4112635 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -158,7 +158,7 @@ object FallbackStorage extends Logging {
         val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
         val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
         val f = fallbackFileSystem.open(dataFile)
-        val size = nextOffset - 1 - offset
+        val size = nextOffset - offset
         logDebug(s"To byte array $size")
         val array = new Array[Byte](size.toInt)
         val startTimeNs = System.nanoTime()
diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 2eeae2e..c07edb6 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -59,6 +59,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
   test("fallback storage APIs - copy/exists") {
     val conf = new SparkConf(false)
       .set("spark.app.id", "testId")
+      .set(SHUFFLE_COMPRESS, false)
       .set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
       .set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
         Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
@@ -227,43 +228,45 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
     }
   }
 
-  test("Newly added executors should access old data from remote storage") {
-    sc = new SparkContext(getSparkConf(2, 0))
-    withSpark(sc) { sc =>
-      TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
-      val rdd1 = sc.parallelize(1 to 10, 2)
-      val rdd2 = rdd1.map(x => (x % 2, 1))
-      val rdd3 = rdd2.reduceByKey(_ + _)
-      assert(rdd3.collect() === Array((0, 5), (1, 5)))
+  Seq("lz4", "lzf", "snappy", "zstd").foreach { codec =>
+    test(s"$codec - Newly added executors should access old data from remote storage") {
+      sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec))
+      withSpark(sc) { sc =>
+        TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+        val rdd1 = sc.parallelize(1 to 10, 2)
+        val rdd2 = rdd1.map(x => (x % 2, 1))
+        val rdd3 = rdd2.reduceByKey(_ + _)
+        assert(rdd3.collect() === Array((0, 5), (1, 5)))
+
+        // Decommission all
+        val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+        sc.getExecutorIds().foreach {
+          sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
+        }
 
-      // Decommission all
-      val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
-      sc.getExecutorIds().foreach {
-        sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
-      }
+        // Make it sure that fallback storage are ready
+        val fallbackStorage = new FallbackStorage(sc.getConf)
+        eventually(timeout(10.seconds), interval(1.seconds)) {
+          Seq(
+            "shuffle_0_0_0.index", "shuffle_0_0_0.data",
+            "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
+            assert(fallbackStorage.exists(0, file))
+          }
+        }
 
-      // Make it sure that fallback storage are ready
-      val fallbackStorage = new FallbackStorage(sc.getConf)
-      eventually(timeout(10.seconds), interval(1.seconds)) {
-        Seq(
-          "shuffle_0_0_0.index", "shuffle_0_0_0.data",
-          "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
-          assert(fallbackStorage.exists(0, file))
+        // Since the data is safe, force to shrink down to zero executor
+        sc.getExecutorIds().foreach { id =>
+          sched.killExecutor(id)
+        }
+        eventually(timeout(20.seconds), interval(1.seconds)) {
+          assert(sc.getExecutorIds().isEmpty)
         }
-      }
 
-      // Since the data is safe, force to shrink down to zero executor
-      sc.getExecutorIds().foreach { id =>
-        sched.killExecutor(id)
-      }
-      eventually(timeout(20.seconds), interval(1.seconds)) {
-        assert(sc.getExecutorIds().isEmpty)
+        // Dynamic allocation will start new executors
+        assert(rdd3.collect() === Array((0, 5), (1, 5)))
+        assert(rdd3.sortByKey().count() == 2)
+        assert(sc.getExecutorIds().nonEmpty)
       }
-
-      // Dynamic allocation will start new executors
-      assert(rdd3.collect() === Array((0, 5), (1, 5)))
-      assert(rdd3.sortByKey().count() == 2)
-      assert(sc.getExecutorIds().nonEmpty)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org