You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Arnaud Nauwynck (Jira)" <ji...@apache.org> on 2024/01/10 19:49:00 UTC

[jira] [Updated] (SPARK-46659) Add customizable TaskScheduling param, to avoid randomly choosing executor for tasks, and downscale on low micro-batches activity

     [ https://issues.apache.org/jira/browse/SPARK-46659?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Arnaud Nauwynck updated SPARK-46659:
------------------------------------
    Description: 
When using dynamicAllocation (but not spark.decommission.enabled=true) with a micro-batches activity, very small tasks are arriving at regular interval, and are processed extremely quickly.
The flow of events that are processed may consume less than 1% of the cpu of the cluster.
But globally, the number of executors stay at a high level (spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time IDDLE.

Unfortunatly, in the current code, tasks are assigned randomly to executors, so a constant flow of very small tasks maintain artificially in an "active" status all the executors: 
all executors are receiving tasks from time to time, so strictly speaking, they are never considered as IDDLE during a duration longer than "spark.dynamicAllocation.executorIdleTimeout". 

Therefore, executors are never marked as candidate for decommissioning, and they continue to receive tasks forever, while thoses tasks could easily be assigned to any other executor (chosen not randomly).


The proposition is therefore to add a new configuration property to suppress the random shuffling of assignable offers for task.

see this code [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]
```java
  /**
   * Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
   * overriding in tests, so it can be deterministic.
   */
  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
    Random.shuffle(offers)
  }
```

It could be replaced simply by
```java
    val SKIP_RANDOMIZE_WORKER_OFFERS =  ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
      .version("3.6.0")
      .booleanConf
      .createWithDefault(false)
..

    val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)

..

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
    if (skipRandomizeWorkerOffers) {
       offers
    } else {
       Random.shuffle(offers)
    }
  }
```


  was:
When using dynamicAllocation (but not spark.decommission.enabled=true) with a micro-batches activity, very small tasks are arriving at regular interval, and are processed extremely quickly.
The flow of events that are processed may consume less than 1% of the cpu of the cluster.
But globally, the number of executors stay at a high level (spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time IDDLE.

Unfortunatly, in the current code, tasks are assigned randomly to executors, so a constant flow of very small tasks maintain artificially in an "active" status all the executors: 
all executors are receiving tasks from time to time, so strictly speaking, they are never considered as IDDLE during a duration longer than "spark.dynamicAllocation.executorIdleTimeout". 

Therefore, executors are never marked as candidate for decommissioning, and they continue to receive tasks forever, while thoses tasks could easily be assigned to any other executor (chosen not randomly).


The proposition is therefore to add a new configuration property to suppress the random shuffling of assignable offers for task.

see this code [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]
{{
  /**
   * Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
   * overriding in tests, so it can be deterministic.
   */
  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
    Random.shuffle(offers)
  }
}}

It could be replaced simply by
{{
    val SKIP_RANDOMIZE_WORKER_OFFERS =  ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
      .version("3.6.0")
      .booleanConf
      .createWithDefault(false)
..

    val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)

..

  protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
    if (skipRandomizeWorkerOffers) {
       offers
    } else {
       Random.shuffle(offers)
    }
  }
}}



> Add customizable TaskScheduling param, to avoid randomly choosing executor for tasks, and downscale on low micro-batches activity
> ---------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-46659
>                 URL: https://issues.apache.org/jira/browse/SPARK-46659
>             Project: Spark
>          Issue Type: Wish
>          Components: Spark Core
>    Affects Versions: 3.4.0, 3.5.0, 4.0.0
>            Reporter: Arnaud Nauwynck
>            Priority: Minor
>
> When using dynamicAllocation (but not spark.decommission.enabled=true) with a micro-batches activity, very small tasks are arriving at regular interval, and are processed extremely quickly.
> The flow of events that are processed may consume less than 1% of the cpu of the cluster.
> But globally, the number of executors stay at a high level (spark.dynamicAllocation.maxExecutors) eventhough they are all 99% of the time IDDLE.
> Unfortunatly, in the current code, tasks are assigned randomly to executors, so a constant flow of very small tasks maintain artificially in an "active" status all the executors: 
> all executors are receiving tasks from time to time, so strictly speaking, they are never considered as IDDLE during a duration longer than "spark.dynamicAllocation.executorIdleTimeout". 
> Therefore, executors are never marked as candidate for decommissioning, and they continue to receive tasks forever, while thoses tasks could easily be assigned to any other executor (chosen not randomly).
> The proposition is therefore to add a new configuration property to suppress the random shuffling of assignable offers for task.
> see this code [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773|https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L773]
> ```java
>   /**
>    * Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
>    * overriding in tests, so it can be deterministic.
>    */
>   protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
>     Random.shuffle(offers)
>   }
> ```
> It could be replaced simply by
> ```java
>     val SKIP_RANDOMIZE_WORKER_OFFERS =  ConfigBuilder("spark.task.skipRandomizeWorkerOffers")
>       .version("3.6.0")
>       .booleanConf
>       .createWithDefault(false)
> ..
>     val skipRandomizeWorkerOffers = conf.get(SKIP_RANDOMIZE_WORKER_OFFERS)
> ..
>   protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
>     if (skipRandomizeWorkerOffers) {
>        offers
>     } else {
>        Random.shuffle(offers)
>     }
>   }
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org