You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/13 12:40:11 UTC

[GitHub] [spark] attilapiros commented on a change in pull request #24499: [SPARK-27677][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation

attilapiros commented on a change in pull request #24499: [SPARK-27677][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499#discussion_r283325216
 
 

 ##########
 File path: core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala
 ##########
 @@ -92,4 +95,40 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
     }
     e.getMessage should include ("Fetch failure will not retry stage due to testing config")
   }
+
+  test("SPARK-25888: using external shuffle service fetching disk persisted blocks") {
+    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+    sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
+    sc.env.blockManager.shuffleClient.getClass should equal(classOf[ExternalShuffleClient])
+
+    val rdd = sc.parallelize(0 until 100, 2)
+      .map { i => (i, 1) }
+      .persist(StorageLevel.DISK_ONLY)
+
+    rdd.count()
+
+    val blockId = RDDBlockId(rdd.id, 0)
+    eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val locations = sc.env.blockManager.master.getLocations(blockId)
+      assert(locations.size === 2)
+      assert(locations.map(_.port).contains(server.getPort),
+        "external shuffle service port should be contained")
+    }
+
+    sc.killExecutors(sc.getExecutorIds())
+
+    eventually(timeout(2.seconds), interval(100.milliseconds)) {
+      val locations = sc.env.blockManager.master.getLocations(blockId)
+      assert(locations.size === 1)
+      assert(locations.map(_.port).contains(server.getPort),
+        "external shuffle service port should be contained")
+    }
+
+    val rddSplit0Block = sc.env.blockManager.getRemoteValues(blockId)
+    assert(rddSplit0Block.isDefined)
+
+    // Invalidate the registered executors, disallowing access to their shuffle blocks (without
+    // deleting the actual shuffle files, so we could access them without the shuffle service).
+    rpcHandler.applicationRemoved(sc.conf.getAppId, false /* cleanupLocalDirs */)
 
 Review comment:
   Thanks, yes this comment is wrong and we can cleanup the local dirs too here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org