You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/29 00:33:32 UTC
[spark] branch master updated: [SPARK-33916][CORE] Fix fallback
storage offset and improve compression codec test coverage
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6497ccb [SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage
6497ccb is described below
commit 6497ccbbda1874187ee60a4f6368e6d9ae6580ff
Author: Dongjoon Hyun <dh...@apple.com>
AuthorDate: Mon Dec 28 16:33:01 2020 -0800
[SPARK-33916][CORE] Fix fallback storage offset and improve compression codec test coverage
### What changes were proposed in this pull request?
This PR aims to fix offset bug and improve compression codec test coverage.
### Why are the changes needed?
When the user choose a non-default codec, it causes a failure.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Pass the extended test suite.
Closes #30934 from dongjoon-hyun/SPARK-33916.
Authored-by: Dongjoon Hyun <dh...@apple.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../org/apache/spark/storage/FallbackStorage.scala | 2 +-
.../spark/storage/FallbackStorageSuite.scala | 67 +++++++++++-----------
2 files changed, 36 insertions(+), 33 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 9221731..4112635 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -158,7 +158,7 @@ object FallbackStorage extends Logging {
val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$name")
val f = fallbackFileSystem.open(dataFile)
- val size = nextOffset - 1 - offset
+ val size = nextOffset - offset
logDebug(s"To byte array $size")
val array = new Array[Byte](size.toInt)
val startTimeNs = System.nanoTime()
diff --git a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
index 2eeae2e..c07edb6 100644
--- a/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala
@@ -59,6 +59,7 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
test("fallback storage APIs - copy/exists") {
val conf = new SparkConf(false)
.set("spark.app.id", "testId")
+ .set(SHUFFLE_COMPRESS, false)
.set(STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH,
Files.createTempDirectory("tmp").toFile.getAbsolutePath + "/")
@@ -227,43 +228,45 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
}
}
- test("Newly added executors should access old data from remote storage") {
- sc = new SparkContext(getSparkConf(2, 0))
- withSpark(sc) { sc =>
- TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
- val rdd1 = sc.parallelize(1 to 10, 2)
- val rdd2 = rdd1.map(x => (x % 2, 1))
- val rdd3 = rdd2.reduceByKey(_ + _)
- assert(rdd3.collect() === Array((0, 5), (1, 5)))
+ Seq("lz4", "lzf", "snappy", "zstd").foreach { codec =>
+ test(s"$codec - Newly added executors should access old data from remote storage") {
+ sc = new SparkContext(getSparkConf(2, 0).set(IO_COMPRESSION_CODEC, codec))
+ withSpark(sc) { sc =>
+ TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
+ val rdd1 = sc.parallelize(1 to 10, 2)
+ val rdd2 = rdd1.map(x => (x % 2, 1))
+ val rdd3 = rdd2.reduceByKey(_ + _)
+ assert(rdd3.collect() === Array((0, 5), (1, 5)))
+
+ // Decommission all
+ val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+ sc.getExecutorIds().foreach {
+ sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
+ }
- // Decommission all
- val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
- sc.getExecutorIds().foreach {
- sched.decommissionExecutor(_, ExecutorDecommissionInfo(""), false)
- }
+ // Make it sure that fallback storage are ready
+ val fallbackStorage = new FallbackStorage(sc.getConf)
+ eventually(timeout(10.seconds), interval(1.seconds)) {
+ Seq(
+ "shuffle_0_0_0.index", "shuffle_0_0_0.data",
+ "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
+ assert(fallbackStorage.exists(0, file))
+ }
+ }
- // Make it sure that fallback storage are ready
- val fallbackStorage = new FallbackStorage(sc.getConf)
- eventually(timeout(10.seconds), interval(1.seconds)) {
- Seq(
- "shuffle_0_0_0.index", "shuffle_0_0_0.data",
- "shuffle_0_1_0.index", "shuffle_0_1_0.data").foreach { file =>
- assert(fallbackStorage.exists(0, file))
+ // Since the data is safe, force to shrink down to zero executor
+ sc.getExecutorIds().foreach { id =>
+ sched.killExecutor(id)
+ }
+ eventually(timeout(20.seconds), interval(1.seconds)) {
+ assert(sc.getExecutorIds().isEmpty)
}
- }
- // Since the data is safe, force to shrink down to zero executor
- sc.getExecutorIds().foreach { id =>
- sched.killExecutor(id)
- }
- eventually(timeout(20.seconds), interval(1.seconds)) {
- assert(sc.getExecutorIds().isEmpty)
+ // Dynamic allocation will start new executors
+ assert(rdd3.collect() === Array((0, 5), (1, 5)))
+ assert(rdd3.sortByKey().count() == 2)
+ assert(sc.getExecutorIds().nonEmpty)
}
-
- // Dynamic allocation will start new executors
- assert(rdd3.collect() === Array((0, 5), (1, 5)))
- assert(rdd3.sortByKey().count() == 2)
- assert(sc.getExecutorIds().nonEmpty)
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org