You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "maheshk114 (via GitHub)" <gi...@apache.org> on 2024/02/23 03:56:06 UTC

[PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

maheshk114 opened a new pull request, #45228:
URL: https://github.com/apache/spark/pull/45228

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   This PR exposes a config allowing user to migrate the shuffle data directly to external storage. Changes are made to migrate the data in multiple thread to reduce the migration time. Similarly, the reading of shuffle data from external data is done using multiple thread. Configuration parameter is added to control the number of threads to be used for reading shuffle data.
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   Currently Spark supports migration of shuffle data to peer nodes during node decommissioning. If peer nodes are not accessible, then Spark falls back to external storage. User needs to provide the storage location path. There are scenarios where user may want to migrate to external storage instead of peer nodes. This may be because of unstable nodes or due to the need of aggressive scale down. So, user should be able to configure to migrate the shuffle data directly to external storage if the use case permits. 
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   New Unit tests are added.
   Made sure that existing tests are passing.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2010337176

   To @maheshk114 , could you rebase this PR to the `master` branch once more to test on top of Apache Hadoop 3.4.0? The following is merged to Apache Spark `master` branch.
   - #45583


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542847717


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @attilapiros Thanks for taking a look at this PR. In the current code, the shuffle data read from local disk and external storage is handled in same way. It first tries to read from local disk and if it fails then it tries to read from external storage(BlockManager.getLocalBlockData). In this PR I have modified this behavior. The shuffle data written to external storage will be treated as remote block. If the remote block id points to external storage, then the read will be done in a separate thread. This code change is to make sure that, if fallback storage read thread pool is configured, then the fallback storage (external storage) blocks will be processed as remote block or else it will be processed as local block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1543282253


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   I see that but what I do not get why we have two separate behaviour at the read side. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1544186456


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   Yes that would be a proper fix. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542308319


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {

Review Comment:
   Why the null checks for `conf`? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533281979


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -92,6 +95,57 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
     val hash = JavaUtils.nonNegativeHash(filename)
     fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
   }
+
+  private val fetchThreadPool: Option[ThreadPoolExecutor] = {
+    val numShuffleThreads = FallbackStorage.getNumReadThreads(conf)
+    if (numShuffleThreads > 0) {
+      logInfo(s"FallbackStorage created thread pool using  ${numShuffleThreads} thread(s)")
+      Some(ThreadUtils.newDaemonCachedThreadPool(
+        "FetchFromFallbackStorage-threadPool", numShuffleThreads))
+    } else {
+      logInfo("FallbackStorage thread pool not created")
+      None
+    }
+  }
+
+  def fetchBlocks(
+      blockManager: BlockManager,
+      blocks: collection.Seq[FetchBlockInfo],
+      address: BlockManagerId,
+      listener: BlockFetchingListener): Unit = {
+    fetchThreadPool match {
+      case Some(p) if !p.isShutdown =>
+        blocks.foreach(block =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              fetchShuffleBlocks(block, blockManager, listener)
+            }
+          })
+        )
+      case _ =>
+        logInfo(s" fetchThreadPool does not exists for $address or shutdown")
+        blocks.foreach(block => fetchShuffleBlocks(block, blockManager, listener))
+    }
+  }
+
+  private def fetchShuffleBlocks(
+              block: FetchBlockInfo,

Review Comment:
   indentation.



##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -92,6 +95,57 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
     val hash = JavaUtils.nonNegativeHash(filename)
     fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
   }
+
+  private val fetchThreadPool: Option[ThreadPoolExecutor] = {
+    val numShuffleThreads = FallbackStorage.getNumReadThreads(conf)
+    if (numShuffleThreads > 0) {
+      logInfo(s"FallbackStorage created thread pool using  ${numShuffleThreads} thread(s)")
+      Some(ThreadUtils.newDaemonCachedThreadPool(
+        "FetchFromFallbackStorage-threadPool", numShuffleThreads))
+    } else {
+      logInfo("FallbackStorage thread pool not created")
+      None
+    }
+  }
+
+  def fetchBlocks(
+      blockManager: BlockManager,
+      blocks: collection.Seq[FetchBlockInfo],
+      address: BlockManagerId,
+      listener: BlockFetchingListener): Unit = {
+    fetchThreadPool match {
+      case Some(p) if !p.isShutdown =>
+        blocks.foreach(block =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              fetchShuffleBlocks(block, blockManager, listener)
+            }
+          })
+        )
+      case _ =>
+        logInfo(s" fetchThreadPool does not exists for $address or shutdown")
+        blocks.foreach(block => fetchShuffleBlocks(block, blockManager, listener))
+    }
+  }
+
+  private def fetchShuffleBlocks(
+              block: FetchBlockInfo,

Review Comment:
   indentation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1544106977


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   I found where it was first introduced:
   https://github.com/apache/spark/pull/30492/files#r532833817
   
   @dongjoon-hyun can an externally stored block ever be found locally?
   
   If not I would prefer to see a separate case for the external blocks and count them separately too.
   So we would have the following block types:
   - local
   - host-local
   - push-merged-local
   - remote
   - external
   
   @maheshk114, @dongjoon-hyun WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1543266045


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -92,6 +95,51 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
     val hash = JavaUtils.nonNegativeHash(filename)
     fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
   }
+
+  private val fetchThreadPool: Option[ThreadPoolExecutor] = {
+    val numShuffleThreads = FallbackStorage.getNumReadThreads(conf)
+    if (numShuffleThreads > 0) {
+      logInfo(s"FallbackStorage created thread pool using  ${numShuffleThreads} thread(s)")
+      Some(ThreadUtils.newDaemonCachedThreadPool(
+        "FetchFromFallbackStorage-threadPool", numShuffleThreads))
+    } else {
+      logInfo("FallbackStorage thread pool not created")
+      None
+    }
+  }
+
+  def fetchBlocks(
+      blockManager: BlockManager,
+      blocks: collection.Seq[FetchBlockInfo],
+      address: BlockManagerId,
+      listener: BlockFetchingListener): Unit = {
+    fetchThreadPool match {
+      case Some(p) if !p.isShutdown =>
+        blocks.foreach(block =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              fetchShuffleBlocks(block, blockManager, listener)
+            }
+          })
+        )
+      case _ =>
+        logInfo(s" fetchThreadPool does not exists for $address or shutdown")
+        blocks.foreach(block => fetchShuffleBlocks(block, blockManager, listener))
+    }
+  }
+
+  private def fetchShuffleBlocks(
+      block: FetchBlockInfo,
+      blockManager: BlockManager,

Review Comment:
   Why `blockManager` is passed if never used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2004945963

   Sorry for missing pings here, @mridulm . Let me take a look during this week.
   
   FYI, I was thinking that Apache Hadoop 3.4 and S3 Express One Zone feature was the minimum limitation of this efforts because historically these kind of previous effort didn't deliver desirable benefits due to the inevitable slowness.
   
   I'll try quickly when I have some time and share the opinion here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2005138516

   Thanks @dongjoon-hyun :-)
    (And I forgot I had pinged you before, sorry about that !)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533281749


##########
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:
##########
@@ -354,7 +355,8 @@ private[spark] class CoarseGrainedExecutorBackend(
                 // We can only trust allBlocksMigrated boolean value if there were no tasks running
                 // since the start of computing it.
                 if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
-                  logInfo("No running tasks, all blocks migrated, stopping.")
+                  val timeTakenMs = (System.nanoTime() - startTime) / (1000 * 1000)
+                  logInfo(s"No running tasks, all blocks migrated in $timeTakenMs ms, stopping.")

Review Comment:
   Please revert the log-related change of this file because it's irrelevant to the functionality. We can handle this separately if needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542308491


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+      if (fallbackStorage.isDefined) {
+        val fallbackPath = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
+        if (fallbackPath.equals(fallbackStorage.get.fallbackPath.toString)) {
+          logDebug(s"FallbackStorage defined with path $fallbackPath")
+          fallbackStorage
+        } else {
+          // for unit test.
+          Some(new FallbackStorage(conf))
+        }
+      } else {
+        fallbackStorage = Some(new FallbackStorage(conf))
+        logInfo(s"Created FallbackStorage $fallbackStorage")
+        fallbackStorage
+      }
     } else {
       None
     }
   }
 
+  def getNumReadThreads(conf: SparkConf): Int = {
+    val numShuffleThreads =
+      if (conf == null) None else conf.get(STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ)

Review Comment:
   Same here:
   Why the null checks for `conf`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1545941169


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @dongjoon-hyun Do you think it can be  modified this way ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1535382480


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -551,6 +551,22 @@ package object config {
       .checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
       .createOptional
 
+  private[spark] val STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ =
+    ConfigBuilder("spark.storage.fallbackStorage.num.threads.for.shuffle.read")

Review Comment:
   done



##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -147,6 +222,14 @@ private[spark] object FallbackStorage extends Logging {
     }
   }
 
+  def stopThreadPool(conf: SparkConf): Unit = {
+    logInfo(s" Stopping thread pool")
+    if (getFallbackStorage(conf).isDefined &&
+      getFallbackStorage(conf).get.fetchThreadPool.isDefined) {

Review Comment:
   done



##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -92,6 +95,57 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
     val hash = JavaUtils.nonNegativeHash(filename)
     fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
   }
+
+  private val fetchThreadPool: Option[ThreadPoolExecutor] = {
+    val numShuffleThreads = FallbackStorage.getNumReadThreads(conf)
+    if (numShuffleThreads > 0) {
+      logInfo(s"FallbackStorage created thread pool using  ${numShuffleThreads} thread(s)")
+      Some(ThreadUtils.newDaemonCachedThreadPool(
+        "FetchFromFallbackStorage-threadPool", numShuffleThreads))
+    } else {
+      logInfo("FallbackStorage thread pool not created")
+      None
+    }
+  }
+
+  def fetchBlocks(
+      blockManager: BlockManager,
+      blocks: collection.Seq[FetchBlockInfo],
+      address: BlockManagerId,
+      listener: BlockFetchingListener): Unit = {
+    fetchThreadPool match {
+      case Some(p) if !p.isShutdown =>
+        blocks.foreach(block =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              fetchShuffleBlocks(block, blockManager, listener)
+            }
+          })
+        )
+      case _ =>
+        logInfo(s" fetchThreadPool does not exists for $address or shutdown")
+        blocks.foreach(block => fetchShuffleBlocks(block, blockManager, listener))
+    }
+  }
+
+  private def fetchShuffleBlocks(
+              block: FetchBlockInfo,

Review Comment:
   done



##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -172,61 +172,64 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   test("migrate shuffle data to fallback storage") {
-    val conf = new SparkConf(false)

Review Comment:
   The test is not removed. I have just added a new conf to avoid duplicating the same test case.



##########
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala:
##########
@@ -354,7 +355,8 @@ private[spark] class CoarseGrainedExecutorBackend(
                 // We can only trust allBlocksMigrated boolean value if there were no tasks running
                 // since the start of computing it.
                 if (allBlocksMigrated && (migrationTime > lastTaskFinishTime.get())) {
-                  logInfo("No running tasks, all blocks migrated, stopping.")
+                  val timeTakenMs = (System.nanoTime() - startTime) / (1000 * 1000)
+                  logInfo(s"No running tasks, all blocks migrated in $timeTakenMs ms, stopping.")

Review Comment:
   This log line can be used to compare the time taken to migrate to external storage and remote node. This may be helpful to user to check the increase in migration time.



##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -1558,6 +1587,7 @@ object ShuffleBlockFetcherIterator {
       mapIndex: Int,
       address: BlockManagerId,
       size: Long,
+      timeTaken: Long,

Review Comment:
   This is not just for logging but to capture the time taken by read operations. This was helpful in comparing the time diff between read from external storage and remote node.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533284633


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -1558,6 +1587,7 @@ object ShuffleBlockFetcherIterator {
       mapIndex: Int,
       address: BlockManagerId,
       size: Long,
+      timeTaken: Long,

Review Comment:
   May I ask the reason why we add a new filed here? If this is only for pure logging, please revert this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2011167980

   > To @maheshk114 , I have a big concern on the AS-IS PR title because it is misleading the users by ignoring the existing Apache Spark 3.2 features.
   > 
   > > [SPARK-47141] [Core]: Support shuffle migration to external storage.
   > 
   > At least, as the author of the following Apache Spark 3.2 patches, I believe Apache Spark "Support Shuffle Migration to External Storage" already. WDTY, @maheshk114 ? Could you revise the PR title to narrow down to this PR's exact contribution?
   > 
   > * [[SPARK-33545][CORE] Support Fallback Storage during Worker decommission #30492](https://github.com/apache/spark/pull/30492)
   > * [[SPARK-34142][CORE] Support Fallback Storage Cleanup during stopping SparkContext #31215](https://github.com/apache/spark/pull/31215)
   
   Thanks a lot @dongjoon-hyun for looking into the PR. Yes, it make sense. I have updated the PR title to reflect the exact changes done in this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2011171597

   > To @maheshk114 , could you rebase this PR to the `master` branch once more to test on top of Apache Hadoop 3.4.0? The following is merged to Apache Spark `master` branch.
   > 
   > * [[SPARK-45393][BUILD] Upgrade Hadoop to 3.4.0 #45583](https://github.com/apache/spark/pull/45583)
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533282286


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -147,6 +222,14 @@ private[spark] object FallbackStorage extends Logging {
     }
   }
 
+  def stopThreadPool(conf: SparkConf): Unit = {
+    logInfo(s" Stopping thread pool")
+    if (getFallbackStorage(conf).isDefined &&
+      getFallbackStorage(conf).get.fetchThreadPool.isDefined) {

Review Comment:
   indentation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2004821968

   +CC @dongjoon-hyun, @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542850643


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
+      if (fallbackStorage.isDefined) {
+        val fallbackPath = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
+        if (fallbackPath.equals(fallbackStorage.get.fallbackPath.toString)) {
+          logDebug(s"FallbackStorage defined with path $fallbackPath")
+          fallbackStorage
+        } else {
+          // for unit test.
+          Some(new FallbackStorage(conf))
+        }
+      } else {
+        fallbackStorage = Some(new FallbackStorage(conf))
+        logInfo(s"Created FallbackStorage $fallbackStorage")
+        fallbackStorage
+      }
     } else {
       None
     }
   }
 
+  def getNumReadThreads(conf: SparkConf): Int = {
+    val numShuffleThreads =
+      if (conf == null) None else conf.get(STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ)

Review Comment:
   to fix UT failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @maheshk114 can you explain why this change is needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1543240507


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {

Review Comment:
   I would prefer to have the fix in the unit test and not in the production code. Otherwise the next developer touching this code will be uncertain why having null checks for the config here and there and he might keep adding null checks to his new code. Moreover if the config would be optional it should be an `Option` but this mixed solution is a bit misleading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2016367450

   > I finished the first round review, @maheshk114 .
   > 
   > BTW, could you share some performance result of this contribution on your system? Please add them to the PR description.
   
   I have a internal setup with 3.4 ..will share the number on that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533285590


##########
core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala:
##########
@@ -172,61 +172,64 @@ class FallbackStorageSuite extends SparkFunSuite with LocalSparkContext {
   }
 
   test("migrate shuffle data to fallback storage") {
-    val conf = new SparkConf(false)

Review Comment:
   Please preserve the existing test coverage as much as possible. We had better add new feature test coverage separately while keeping the existing test coverage unchanged in order to prevent any regression.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2027517275

   I am also concerned about the performance.
    
   I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
   
   @maheshk114 WDYT? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1544017626


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   The original code was written to read the local shuffle data in the same thread to avoid any thread creation overhead. So i wanted to keep the same behavior for local read and use a thread pool for read from external storage.



##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -92,6 +95,51 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
     val hash = JavaUtils.nonNegativeHash(filename)
     fallbackFileSystem.exists(new Path(fallbackPath, s"$appId/$shuffleId/$hash/$filename"))
   }
+
+  private val fetchThreadPool: Option[ThreadPoolExecutor] = {
+    val numShuffleThreads = FallbackStorage.getNumReadThreads(conf)
+    if (numShuffleThreads > 0) {
+      logInfo(s"FallbackStorage created thread pool using  ${numShuffleThreads} thread(s)")
+      Some(ThreadUtils.newDaemonCachedThreadPool(
+        "FetchFromFallbackStorage-threadPool", numShuffleThreads))
+    } else {
+      logInfo("FallbackStorage thread pool not created")
+      None
+    }
+  }
+
+  def fetchBlocks(
+      blockManager: BlockManager,
+      blocks: collection.Seq[FetchBlockInfo],
+      address: BlockManagerId,
+      listener: BlockFetchingListener): Unit = {
+    fetchThreadPool match {
+      case Some(p) if !p.isShutdown =>
+        blocks.foreach(block =>
+          p.submit(new Runnable {
+            override def run(): Unit = {
+              fetchShuffleBlocks(block, blockManager, listener)
+            }
+          })
+        )
+      case _ =>
+        logInfo(s" fetchThreadPool does not exists for $address or shutdown")
+        blocks.foreach(block => fetchShuffleBlocks(block, blockManager, listener))
+    }
+  }
+
+  private def fetchShuffleBlocks(
+      block: FetchBlockInfo,
+      blockManager: BlockManager,

Review Comment:
   In my first commit, the logic was to read from local disk first and try external storage only in case of IO exception. I removed that logic but forgot to remove the blockManager. Will remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1544059249


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {

Review Comment:
   make sense. Will modify accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "abhishekd0907 (via GitHub)" <gi...@apache.org>.
abhishekd0907 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-1993902079

   Hi @mridulm  @attilapiros , Can you please help in reviewing this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2031012182

   > > > I am also concerned about the performance.
   > > > I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
   > > > @maheshk114 WDYT?
   > > 
   > > 
   > > Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.
   > 
   > With a threshold you can force to do the migration to the external storage every time (or even never) so the flag won't be needed.
   
   @attilapiros This will work for our use case, as we will be setting it either to 0% or 100%. In normal scenarios, it should be set to more than 50%. That means when more than 50% of executors are decommissioning, we can enable migrating to external storage. But as of now Spark decommissions the  executor as and when the executor is found idle for more than idle timeout or for some reason cluster manager wants to decommission the node. So there is no way to find, how many total executors are going to be decommissioned in advance. With this logic, we will start migrating to peer and later when the number reaches the threshold we will start migrating to external storage. This will result into extra movement of shuffle which we want to avoid in the first place.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2004949098

   I assigned myself not to forget.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542849547


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -110,14 +158,35 @@ private[spark] object FallbackStorage extends Logging {
   /** We use one block manager id as a place holder. */
   val FALLBACK_BLOCK_MANAGER_ID: BlockManagerId = BlockManagerId("fallback", "remote", 7337)
 
-  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = {
-    if (conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {
-      Some(new FallbackStorage(conf))
+  // There should be only one fallback storage thread pool per executor.
+  var fallbackStorage: Option[FallbackStorage] = None
+  def getFallbackStorage(conf: SparkConf): Option[FallbackStorage] = this.synchronized {
+    if (conf != null && conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).isDefined) {

Review Comment:
   Some unit tests scenario, the conf is not initialized.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533283117


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -188,15 +271,15 @@ private[spark] object FallbackStorage extends Logging {
         val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
         val hash = JavaUtils.nonNegativeHash(name)
         val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
-        val f = fallbackFileSystem.open(dataFile)
         val size = nextOffset - offset
         logDebug(s"To byte array $size")
         val array = new Array[Byte](size.toInt)
         val startTimeNs = System.nanoTime()
-        f.seek(offset)
-        f.readFully(array)
-        logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
-        f.close()
+        Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
+          f.seek(offset)
+          f.readFully(array)
+          logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
+        }

Review Comment:
   Could you spin off this `Utils.tryWithResource` related change to a new JIRA? We can merge that first.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1535339647


##########
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala:
##########
@@ -188,15 +271,15 @@ private[spark] object FallbackStorage extends Logging {
         val name = ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID).name
         val hash = JavaUtils.nonNegativeHash(name)
         val dataFile = new Path(fallbackPath, s"$appId/$shuffleId/$hash/$name")
-        val f = fallbackFileSystem.open(dataFile)
         val size = nextOffset - offset
         logDebug(s"To byte array $size")
         val array = new Array[Byte](size.toInt)
         val startTimeNs = System.nanoTime()
-        f.seek(offset)
-        f.readFully(array)
-        logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
-        f.close()
+        Utils.tryWithResource(fallbackFileSystem.open(dataFile)) { f =>
+          f.seek(offset)
+          f.readFully(array)
+          logDebug(s"Took ${(System.nanoTime() - startTimeNs) / (1000 * 1000)}ms")
+        }

Review Comment:
   @dongjoon-hyun  https://github.com/apache/spark/pull/45663



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2029191559

   > > I am also concerned about the performance.
   > > I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
   > > @maheshk114 WDYT?
   > 
   > Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.
   
   With a threshold you can force to do the migration to the external storage every time (or even never) so the flag won't be needed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "attilapiros (via GitHub)" <gi...@apache.org>.
attilapiros commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1542306911


##########
core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala:
##########
@@ -398,8 +410,13 @@ final class ShuffleBlockFetcherIterator(
     var pushMergedLocalBlockBytes = 0L
     val prevNumBlocksToFetch = numBlocksToFetch
 
-    val fallback = FallbackStorage.FALLBACK_BLOCK_MANAGER_ID.executorId
-    val localExecIds = Set(blockManager.blockManagerId.executorId, fallback)
+    // Fallback to original implementation, if thread pool is not enabled.
+    val localExecIds = if (FallbackStorage.getNumReadThreads(blockManager.conf) > 0) {

Review Comment:
   @maheshk114 can you please explain why this change is needed? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2029095673

   > I am also concerned about the performance.
   > 
   > I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
   > 
   > @maheshk114 WDYT?
   
   Its not only performance but also useful when the nodes are not very reliable. So I think we should have a Boolean flag also to allow user to chose to migrate the shuffle directly to external storage.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533274587


##########
core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala:
##########
@@ -447,7 +462,30 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT
     assert(!iterator.hasNext)
   }
 
-  test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads") {
+  def createShuffleFile(shuffleId: Int, mapId: Int, reducerId: Int, conf: SparkConf): Unit = {
+    var name = ShuffleIndexBlockId(shuffleId, mapId, reducerId).name
+    var hash = JavaUtils.nonNegativeHash(name)
+    val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
+    val appId = conf.getAppId
+    val path: String = conf.get(STORAGE_DECOMMISSION_FALLBACK_STORAGE_PATH).get
+    val indexFile = new Path(path, s"$appId/$shuffleId/$hash/$name")
+    name = ShuffleDataBlockId(shuffleId, mapId, reducerId).name
+    hash = JavaUtils.nonNegativeHash(name)
+    val dataFile = new Path(path, s"$appId/$shuffleId/$hash/$name")
+    val fallbackFileSystem = org.apache.hadoop.fs.FileSystem.get(indexFile.toUri, hadoopConf)
+    val indexOut = fallbackFileSystem.create(indexFile)
+    indexOut.writeLong(0L) // offset
+    indexOut.writeLong(10L) // next offset
+    indexOut.writeLong(10L) // next offset
+    indexOut.writeLong(10L) // next offset
+    val dataOut = fallbackFileSystem.create(dataFile)
+    dataOut.writeBytes("some data to write")
+    indexOut.close()
+    dataOut.close()
+  }
+
+  test("fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads " +
+    "+ 2 read from external storage") {

Review Comment:
   This PR seems to introduce a test failure (or flakiness) at this test case. Could you take a look at the CI failure?
   - https://github.com/maheshk114/spark/actions/runs/8369424178/job/22915159186
   ```
   [info] ShuffleBlockFetcherIteratorSuite:
   ...
   [info] - fetch continuous blocks in batch successful 3 local + 4 host local + 2 remote reads + 2 read from external storage *** FAILED *** (107 milliseconds)
   [info]   org.mockito.exceptions.verification.TooFewActualInvocations: blockManager.getLocalBlockData(<any>);
   [info] Wanted 2 times:
   [info] -> at org.apache.spark.storage.BlockManager.getLocalBlockData(BlockManager.scala:721)
   [info] But was 1 time:
   [info] -> at org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchLocalBlocks(ShuffleBlockFetcherIterator.scala:592)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45228:
URL: https://github.com/apache/spark/pull/45228#discussion_r1533275990


##########
core/src/main/scala/org/apache/spark/internal/config/package.scala:
##########
@@ -551,6 +551,22 @@ package object config {
       .checkValue(_.endsWith(java.io.File.separator), "Path should end with separator.")
       .createOptional
 
+  private[spark] val STORAGE_FALLBACK_STORAGE_NUM_THREADS_FOR_SHUFFLE_READ =
+    ConfigBuilder("spark.storage.fallbackStorage.num.threads.for.shuffle.read")

Review Comment:
   Every `.` means a new namespace in Apache Spark configuration namespace scheme. For example, this line introduces 4 new namespaces. Please avoid using `.`.
   - spark.storage.fallbackStorage.num.*
   - spark.storage.fallbackStorage.num.threads.*
   - spark.storage.fallbackStorage.num.threads.for.*
   - spark.storage.fallbackStorage.num.threads.for.shuffle.*



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2011254687

   Thank you for rebasing, @maheshk114 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141][CORE] Support enabling migration of shuffle data directly to external storage using config parameter. [spark]

Posted by "maheshk114 (via GitHub)" <gi...@apache.org>.
maheshk114 commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-2029934410

   > I am also concerned about the performance.
   > 
   > I think the best would be if the migration of shuffle data to external storage would only kick in when the scale down is aggressive. This can be decided by checking the ratio of the number of available peers (non-decommissioning executors) and the number of decommissioning executors. In that case the parameter would not be a single boolean flag but a threshold for the ratio.
   
   Yes. Will modify accordingly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47141] [Core]: Support shuffle migration to external storage. [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45228:
URL: https://github.com/apache/spark/pull/45228#issuecomment-1962423323

   +CC @dongjoon-hyun , @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org