You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/04/30 16:18:50 UTC

[GitHub] [spark] attilapiros opened a new pull request #24499: [SPARK-25888][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation

attilapiros opened a new pull request #24499: [SPARK-25888][Core] Serve local disk persisted blocks by the external service after releasing executor by dynamic allocation
URL: https://github.com/apache/spark/pull/24499
 
 
   # What changes were proposed in this pull request?
   
   ## Problem statement
   
   An executor which has persisted blocks does not consider to be idle and this way ready to be released by dynamic allocation after the regular timeout `spark.dynamicAllocation.executorIdleTimeout` but there is separate configuration `spark.dynamicAllocation.cachedExecutorIdleTimeout` which defaults to `Integer.MAX_VALUE`. This is because releasing the executor also means losing the persisted blocks (as the metadata for individual blocks called `BlockInfo` are kept in memory) and when the RDD is referenced latter on this lost blocks will be recomputed. 
   On the other hand keeping the executors too long without any task to work on is also a waste of resources (as executors are reserved for the application by the resource manager).
   
   ## Solution
   
   This PR focuses on the first part of SPARK-25888: it extends the external shuffle service with the capability to serve RDD blocks which are persisted on the local disk store by the executors. Moreover when a block is reported to be persisted on to disk the external shuffle service instance running on the same host as the executor is also registered (along with the reporting block manager) as a possible location for fetching it. 
   
   ## Some implementation detail
   
   Some explanation about the decisions made during the development:
   - Introducing `BlockTransferClient` and `BlockTransferClientSync` instead of moving `BlockTransferService#fetchBlockSync()` method to `ShuffleClient`:
   `ShuffleClient` is in the `spark-network-shuffle` artifact where neither `EncryptedManagedBuffer` and `SparkException` (used in the `awaitResult()` which should have been reimplemented here with a Java promise) are not available.
   - the location list to fetch a block was randomized but the groups (same host, same rack, others) order was kept. In this PR the order of groups are kept and external shuffle service added to the end of the each group.
   - `BlockManagerInfo` is not introduced for external shuffle service but only a lightweight solution is taken. A hash map from `BlockId` to `BlockStatus` is introduced. A type alias would make the source more readable but I know it is discouraged. On the other hand a new class wrapping this hash map would introduce unnecessary indirection.  
   
   # How was this patch tested?
   
   ## Unit tests
   
   ### ExternalShuffleServiceSuite
   
   Here the complete use case is tested by the "SPARK-25888: using external shuffle service fetching disk persisted blocks" with a tiny difference: here the executor is killed manually, this way the test is a bit faster than waiting for the idle timeout.
   
   ### ExternalShuffleBlockHandlerSuite
   
   Tests the fetching of the RDD blocks via the external shuffle service.
   
   ### BlockManagerInfoSuite 
   
   This a new suite. As the `BlockManagerInfo` behaviour depends very much on whether the external shuffle service enabled or not all the tests are executed with and without it.
   
   ### BlockManagerSuite
   
   Tests the sorting of the block locations. 
   
   ## Manually on YARN
   
   Spark App was:
   
   ~~~scala
   package com.mycompany
   
   import org.apache.spark.rdd.RDD
   import org.apache.spark.{SparkContext, SparkConf}
   import org.apache.spark.storage.StorageLevel
   
   object TestAppDiskOnlyLevel {
     def main(args: Array[String]): Unit = {
       val conf = new SparkConf().setAppName("test-app")
   
       println("Attila: START")
       val sc = new SparkContext(conf)
       val rdd = sc.parallelize(0 until 100, 10)
         .map { i =>
           println(s"Attila: calculate first rdd i=$i")
           Thread.sleep(1000)
           i
         }
   
       rdd.persist(StorageLevel.DISK_ONLY)
       rdd.count()
   
       println("Attila: First RDD is processed, waiting for 60 sec")
   
       Thread.sleep(60 * 1000)
   
       println("Attila: Num executors must be 0 as executorIdleTimeout is way over")
   
       val rdd2 = sc.parallelize(0 until 10, 1)
         .map(i => (i, 1))
         .persist(StorageLevel.DISK_ONLY)
   
       rdd2.count()
   
       println("Attila: Second RDD with one partition (only one executors must be alive)")
       println("Attila: Calling collect on the first RDD:")
   
       rdd.collect()
   
       println("Attila: STOP")
     }
   }
   ~~~
   
   I have submitted with the following configuration:
   
   ~~~bash
   spark-submit --master yarn \
     --conf spark.dynamicAllocation.enabled=true \
     --conf spark.dynamicAllocation.executorIdleTimeout=30 \
     --conf spark.dynamicAllocation.cachedExecutorIdleTimeout=90 \
     --class com.mycompany.TestAppDiskOnlyLevel dyn_alloc_demo-core_2.11-0.1.0-SNAPSHOT-jar-with-dependencies.jar
   ~~~
   
   Checked the result by filtering for the side effect of the task calculations:
   
   ~~~bash
   [user@server ~]$ yarn logs -applicationId application_1556299359453_0001 | grep "Attila: calculate" | wc -l
   WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
   19/04/26 10:31:59 INFO client.RMProxy: Connecting to ResourceManager at apiros-1.gce.company.com/172.31.115.165:8032
   100
   ~~~
   
   So it is only 100 task execution and not 200 (which would be the case for re-computation).
   
   Moreover from the submit/launcher log we can see executors really stopped in between (see the new total is 0 before the last line): 
   ~~~
   [user@server ~]$ grep "Attila: Num executors must be 0" -B 2 spark-submit.log
   19/04/26 10:24:27 INFO cluster.YarnScheduler: Executor 9 on apiros-3.gce.company.com killed by driver.
   19/04/26 10:24:27 INFO spark.ExecutorAllocationManager: Existing executor 9 has been removed (new total is 0)
   Attila: Num executors must be 0 as executorIdleTimeout is way over
   ~~~
   
   [Full spark submit log](https://github.com/attilapiros/spark/files/3122465/spark-submit.log)
   
   I have done a test also after changing the `DISK_ONLY` storage level to `MEMORY_ONLY` for the first RDD. After this change during the 60sec waiting no executor was removed. 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org