You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "sunchao (via GitHub)" <gi...@apache.org> on 2024/02/06 23:34:18 UTC

[PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

sunchao opened a new pull request, #45052:
URL: https://github.com/apache/spark/pull/45052

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501741974


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   > The stack trace of the NPE that we saw earlier was part of spark context initialization ... not an access from task, right ?
   
   Thanks @mridulm for checking! I think that stack trace doesn't reveal the root cause of the issue. I added a bunch of debugging messages in the code and found out the task that was causing the issue:
   
   ```
   setting active env to org.apache.spark.SparkEnv@5ab3ee8b in pool-1-thread-1-ScalaTest-running-JobCancellationSuite
   active env = org.apache.spark.SparkEnv@5ab3ee8b, thread = Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
   java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
   org.apache.spark.storage.BlockManager.memoryManager$lzycompute(BlockManager.scala:210)
   org.apache.spark.storage.BlockManager.memoryManager(BlockManager.scala:204)
   org.apache.spark.storage.BlockManager.memoryStore$lzycompute(BlockManager.scala:248)
   org.apache.spark.storage.BlockManager.memoryStore(BlockManager.scala:247)
   org.apache.spark.scheduler.Task.$anonfun$run$3(Task.scala:146)
   org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1287)
   org.apache.spark.scheduler.Task.run(Task.scala:144)
   org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:633)
   org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
   org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:96)
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:636)
   java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   java.base/java.lang.Thread.run(Thread.java:840)
   memory manager of org.apache.spark.SparkEnv@5ab3ee8b is null, _memoryManager = null, thread = Executor task launch worker for task 0.0 in stage 0.0 (TID 0)
   set memory manager for org.apache.spark.SparkEnv@5ab3ee8b, threadName = pool-1-thread-1-ScalaTest-running-JobCancellationSuite
   java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
   org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
   org.apache.spark.SparkContext.<init>(SparkContext.scala:141)
   org.apache.spark.JobCancellationSuite.$anonfun$new$45(JobCancellationSuite.scala:430)
   org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
   org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
   org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
   org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
   org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
   org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
   org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
   org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
   org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
   org.scalatest.Transformer.apply(Transformer.scala:22)
   org.scalatest.Transformer.apply(Transformer.scala:20)
   org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
   ```
   
   The "setting active env" and "set memory manager" messages are logged in `SparkContext` initialization, while the "active env =" and "memory manager of " are logged in `BlockManager` when trying to access the `memoryManager`. The first stack trace shows it is from the separate worker thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1500115713


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I think we should be able to change `shuffleManager` in the same manner to avoid the potential concurrency issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501700461


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao , I was looking at this again.
   The stack trace of the NPE that we saw earlier was part of spark context initialization ... not an access from task, right ?
   
   Given this, I am not sure if the hypothesis is valid ... am I missing something here ?
   
   (Btw, the line offset is off by 1 - so not sure if anything else changed as well)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501765989


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao Can you please take a look [at this](https://github.com/apache/spark/compare/master...mridulm:spark:45052-suggestion-for-sunchao) ?
   It should fix the issue we are discussing - the test is for illustration purpose only, please do clean it up :-)
   
   Essentially your fix is in the right direction - the issue is that `blockManager` referenced in task cleanup in finally is incorrect - as you had fixed.
   The only change I introduced is to not need this to be passed in at a Task level - but simply grab it at task start time - and also added a test which validates this is indeed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Ack. I also pinged on #43627 . If this was broken already at Spark 4.0.0, we don't need to care much.
   - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480668776


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Sure. I'm pretty much doing something very similar to this PR https://github.com/apache/spark/pull/43627 but happy to add a new constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501618843


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I think it may also happen in the non-test path but could be very rare. I described the scenario above:
   
   > I'm not sure whether this is a valid case. Looking at the [Task class](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala), it calls SparkEnv.get in several places. However, in the rare cases when a SparkContext has been shut down and a new SparkContext is created, running tasks launched by the former could access SparkEnv created by the latter, which seems not valid. I think this can only happen in the local mode.
   
   The [latest changes](https://github.com/apache/spark/pull/45052/files/a6502e06ef2bf68654f1d3c8bb5c483a071d8770..af68ece323401d06f7e036c3b0782bc80a0f4c93) updated `Task` to use the `BlockManager` from the `Executor` that launches the task, instead of the one from `SparkEnv.get` which could be from a different `SparkContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1500780903


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I'm missing how this is failing, can you clarify?  The blockmanager initialize is called on line 632 after the memory manager is initialized in the block manager, so how do we get a null?



##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -77,6 +76,12 @@ class SparkEnv (
 
   def shuffleManager: ShuffleManager = _shuffleManager
 
+  // We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded
+  // to allow the plugin to overwrite memory configurations

Review Comment:
   nit I assume this is overwriting executor memory configurations please update comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501353037


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Updated `Task` to use the block manager from the `SparkEnv` from the `Executor` that creates the task, instead of the one from `SparkEnv.get`. Also switched back to the original approach similar to the shuffle manager PR. The tests look good now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501618843


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I think it may also happen in the non-test path but could be very rare and the case only got amplified with this PR. I described the scenario above:
   
   > I'm not sure whether this is a valid case. Looking at the [Task class](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala), it calls SparkEnv.get in several places. However, in the rare cases when a SparkContext has been shut down and a new SparkContext is created, running tasks launched by the former could access SparkEnv created by the latter, which seems not valid. I think this can only happen in the local mode.
   
   The [latest changes](https://github.com/apache/spark/pull/45052/files/a6502e06ef2bf68654f1d3c8bb5c483a071d8770..af68ece323401d06f7e036c3b0782bc80a0f4c93) updated `Task` to use the `BlockManager` from the `Executor` that launches the task, instead of the one from `SparkEnv.get` which could be from a different `SparkContext`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in `reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches [blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632), it will end up returning `HeartbeatResponse` with `reregisterBlockManager = true` (since registeration of local block manager happens within `initialize`).
   
   And as part of this, we end up initializing the lazy val memoryManager in BlockManager.
   
   What I am unsure of is, this would normally have triggered an NPE anyway during reregisteration if this path is hit (without this PR) - not sure if that is getting ignored (since blockmanager id is null).
   
   (I did not find driver logs for the test run - that would have helped validate this hypothesis - we would have seen "re-registering with master" for this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in `reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches [blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632), it will end up returning `HeartbeatResponse` with `reregisterBlockManager = true` (since registeration of local block manager happens within `initialize`).
   * This will trigger blockmanager reregister [here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1241).
   
   My hypothesis is that, this will result in an NPE - note, for both this PR (since `memoryManager` is null), and for current master (since `blockManagerId` is null) - when this condition is hit. But that will get ignored since it is part of heartbeat (runs within `logUncaughtExceptions` [here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/Heartbeater.scala#L46)).
   
   But the side effect now is that, we end up initializing the lazy val memoryManager in BlockManager ([here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L668)).
   
   
   
   (I did not find driver logs for the test run - that would have helped validate this hypothesis - we would have seen "re-registering with master" for this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "tgravescs (via GitHub)" <gi...@apache.org>.
tgravescs commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1964315158

   +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao closed pull request #45052: [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded
URL: https://github.com/apache/spark/pull/45052


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501639578


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Although I think the change on the `Task` also makes sense though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501765989


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao Can you please take a look [at this](https://github.com/apache/spark/compare/master...mridulm:spark:45052-suggestion-for-sunchao) ?
   It should fix the issue we are discussing - the test is for illustration purpose only, please do adapt and clean it up :-)
   
   Essentially it is a minor modification to your fix - the issue is that `blockManager` referenced in task cleanup in finally is incorrect - as you had fixed.
   The only change I introduced is to not need this to be passed in at a Task level - but simply grab it at task start time - and also added a test which validates this is indeed the issue.
   
   
   This also means that, given the risk with `blockManager` being in potentially inconsistent state until initialization is complete - we have to add some documentation to it - so that this buggy pattern does not get introduced in future again.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in `reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches [blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632), it will end up returning `HeartbeatResponse` with `reregisterBlockManager = true` (since registeration of local block manager happens within `initialize`).
   * This will trigger blockmanager reregister [here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1241).
   
   My hypothesis is that, this will result in an NPE - note, for both this PR (since `memoryManager` is null), and for current master (since `blockManagerId` is null) - when this condition is hit. But that will get ignored since it is part of heartbeat (runs within `logUncaughtExceptions`).
   
   But the side effect now is that, we end up initializing the lazy val memoryManager in BlockManager ([here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L668)).
   
   
   
   (I did not find driver logs for the test run - that would have helped validate this hypothesis - we would have seen "re-registering with master" for this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1500029618


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am not sure I follow the corner case - can you point me to which test is causing this issue ? Thanks !
   `BlockManager` being used would be associated with the corresponding `SparkEnv` - and if `SparkEnv` is being mutated, the new env is what we should be referencing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501639418


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   > it is reasonable for tasks from previous ctx to fail
   
   Hmm, if this is reasonable, then we just need to fix the test case instead (specifically `JobCancellationSuite`) and wait all tasks to be finish after each test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501060613


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   My suspicion is that after:
   
   ```scala
    _env.initializeMemoryManager(SparkContext.numDriverCores(master, conf))
   ```
   
   in `SparkContext`, `SparkEnv.set` is called by a different thread, which updates the current `SparkEnv` to be a new instance without a `_memoryManager` initialized. Later on when the `maxOnHeapMemory` is called in `BlockManager.initialize`, it calls:
   
   ```scala
   private lazy val _memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)
   ```
   
   which will call `SparkEnv.get.memoryManager`. Since the active `SparkEnv` instance is updated, the `memoryManager` now is null.
   
   I don't know how this would happen though, since it seems `SparkEnv` should be a singleton within a single JVM. Somehow I wasn't not able to reproduce this locally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501700461


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao , I was looking at this again.
   The stack trace of the NPE that we saw earlier was part of spark context initialization ... not an access from task, right ?
   
   Given this, I am not sure if the hypothesis is valid ... am I missing something here ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1500109761


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Sure. It is "job group with interruption". I think it is flaky though and doesn't always happen. When I tried this locally it doesn't always reproduce. The job link: https://github.com/sunchao/spark/actions/runs/7923522243/job/21637267400
   
   ```
   [info] - job group with interruption *** FAILED *** (34 milliseconds)
   [info]   java.lang.NullPointerException: Cannot invoke "org.apache.spark.memory.MemoryManager.maxOnHeapStorageMemory()" because the return value of "org.apache.spark.storage.BlockManager.memoryManager()" is null
   [info]   at org.apache.spark.storage.BlockManager.maxOnHeapMemory$lzycompute(BlockManager.scala:243)
   [info]   at org.apache.spark.storage.BlockManager.maxOnHeapMemory(BlockManager.scala:243)
   [info]   at org.apache.spark.storage.BlockManager.initialize(BlockManager.scala:565)
   [info]   at org.apache.spark.SparkContext.<init>(SparkContext.scala:633)
   [info]   at org.apache.spark.SparkContext.<init>(SparkContext.scala:159)
   [info]   at org.apache.spark.SparkContext.<init>(SparkContext.scala:172)
   [info]   at org.apache.spark.JobCancellationSuite.$anonfun$new$41(JobCancellationSuite.scala:397)
   [info]   at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
   [info]   at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
   [info]   at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
   [info]   at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
   [info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480670318


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Hmm actually it might be a bit difficult since we are changing `memoryManager` from a `val` to a method. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1930961472

   I'll add some tests later. Marking as a draft for now to run through all existing tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1964826164

   Thanks @mridulm @dongjoon-hyun @tgravescs for the review! merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501765989


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao Can you please take a look [at this](https://github.com/apache/spark/compare/master...mridulm:spark:45052-suggestion-for-sunchao) ?
   It should fix the issue we are discussing - the test is for illustration purpose only, please do clean it up :-)
   
   Essentially it is a minor modification to your fix - the issue is that `blockManager` referenced in task cleanup in finally is incorrect - as you had fixed.
   The only change I introduced is to not need this to be passed in at a Task level - but simply grab it at task start time - and also added a test which validates this is indeed the issue.
   
   
   This also means that, given the risk with `blockManager` being in potentially inconsistent state until initialization is complete - we have to add some documentation to it - so that this buggy pattern does not get introduced in future again.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501924992


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Thanks @mridulm . I like your solution which is simpler. Saving the `blockManager` at the beginning of `Task.run` should be sufficient. Let me adapt the code and the test case in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501945502


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -77,6 +76,12 @@ class SparkEnv (
 
   def shuffleManager: ShuffleManager = _shuffleManager
 
+  // We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded
+  // to allow the plugin to overwrite memory configurations

Review Comment:
   updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501632323


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Users dont set `SparkEnv` - only spark does. For tests, we set `SparkEnv` for specific internal scenarios for validation.
   
   As I mentioned, if `SparkContext` was shutdown - it is reasonable for tasks from previous ctx to fail (they should have been terminated anyway).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501632323


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Users dont set `SparkEnv` - spark does. For tests, we set `SparkEnv` for specific internal scenarios for validation.
   
   As I mentioned, if `SparkContext` was shutdown - it is reasonable for tasks from previous ctx to fail (they should have been terminated anyway).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1498124084


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -77,6 +76,12 @@ class SparkEnv (
 
   def shuffleManager: ShuffleManager = _shuffleManager
 
+  // We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded
+  // to allow the plugin to overwrite memory configurations
+  private var _memoryManager: MemoryManager = _

Review Comment:
   nit: move the definition along with `_shuffleManager` above ?



##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Do we want to follow the same pattern as what `shuffleManager` does here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1498147851


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -77,6 +76,12 @@ class SparkEnv (
 
   def shuffleManager: ShuffleManager = _shuffleManager
 
+  // We initialize the MemoryManager later in SparkContext after DriverPlugin is loaded
+  // to allow the plugin to overwrite memory configurations
+  private var _memoryManager: MemoryManager = _

Review Comment:
   sure will do.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1957699454

   +CC @dongjoon-hyun and @tgravescs 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501212482


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I was able to reproduce the failure locally after many tries.
   
   So the root cause is that when a task is running, it needs to access the `memoryStore` in a block manager (see [here](86575ad0-8f6f-422f-b10c-ea920f95caf5)). The access of `memoryStore` in turn will call `memoryManager`, which could be null in the following scenario:
   
   1. a previous test launched a bunch of tasks which may not finish when the next test is started
   2. the next test started, and calls `new SparkContext` which initialize everything include updating the active `SparkEnv` via `SparkEnv.set`
   3. one of the left over tasks in the step 1 accesses (in a different thread than 2) `memoryStore` during its `run` method, which will then call `SparkEnv.get.memoryManager`. However, the `SparkEnv.get` method will return the new `SparkEnv` instance created in step 2, which, at this point may not have the `memoryManager` initialized yet!
   
   I'm not sure whether this is a valid case. Looking at the [`Task` class](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/Task.scala), it calls `SparkEnv.get` in several places. However, in the rare cases when a `SparkContext` has been shut down and a new `SparkContext` is created, running tasks launched by the former could access `SparkEnv` created by the latter, which seems not valid. I think this can only happen in the local mode.
   
   My latest change doesn't fix the issue either. I guess I just got lucky in the unit tests.
   
   One approach I think, is not to use `SparkEnv.get` in the `Task` class, but rather pass in the active environment used by the `Executor`. Let me try this approach.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480666428


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -578,6 +578,8 @@ class SparkContext(config: SparkConf) extends Logging {
     // Initialize any plugins before the task scheduler is initialized.
     _plugins = PluginContainer(this, _resources.asJava)
     _env.initializeShuffleManager()
+    _env.initializeMemoryManager(SparkContext.numDriverCores(master, conf))
+

Review Comment:
   nit. extra empty line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480675486


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Ack. I also pinged on #43627 . If this is broken already at Spark 4.0.0, we don't need to care much.
   - https://github.com/apache/spark/pull/43627#pullrequestreview-1866555431



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480666905


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Could you avoid this breaking change by adding a new constructor, @sunchao ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1480751365


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -67,7 +67,6 @@ class SparkEnv (
     val blockManager: BlockManager,
     val securityManager: SecurityManager,
     val metricsSystem: MetricsSystem,
-    val memoryManager: MemoryManager,

Review Comment:
   Yes, #43627 already requires a API change, so this just built on top of it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on PR #45052:
URL: https://github.com/apache/spark/pull/45052#issuecomment-1947389764

   Sure, thanks @mridulm in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1498365236


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I tried that at the beginning, but found out in certain cases there may be race conditions in:
   
   ```scala
   private lazy val _memoryManager = Option(_memoryManager).getOrElse(SparkEnv.get.memoryManager)
   ```
   
   Since a different thread can call `SparkEnv.set` right after the `memoryManger` is updated in the current `SparkEnv`. As result, the `memoryManager` could be null. This is revealed in `JobCancellationSuite`. 
   
   The current approach makes the `memoryManager` a mutable field and updated later when the driver plugin is loaded.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501375088


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   If this is a task from some previous test, which has completed, the exception is beneign at best and not a concern for non-test codepaths, right ?
   
   If yes, do we need to try to work around this ? (other than by fixing the offending tests perhaps ?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501706435


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I am still looking, but I believe this might be due to a race in `reportHeartBeat` in `Executor`.
   Would like more eyes on it though.
   
   My hypothesis is, something like this is happening:
   
   * scheduler is [started here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L611).
     * In local mode, this directly ends up creating an `Executor`
       * TaskSchedulerImpl.start -> LocalSchedulerBackend.start -> new LocalEndpoint -> new Executor
   * new Executor immediately starts a `heartbeater` - and the initial delay for the thread to start is random, after which it will periodically run.
   * `Executor.reportHeartBeat`, which is invoked, sends a `Heartbeat`.
   * If this `Heartbeat` is received before driver thread reaches [blockmanager.initialize](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/SparkContext.scala#L632), it will end up returning `HeartbeatResponse` with `reregisterBlockManager = true` (since registeration of local block manager happens within `initialize`).
   * This will trigger blockmanager reregister [here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1241).
   
   My hypothesis is that, this will result in an NPE - note, for both this PR (since `memoryManager` is null), and for current master (since `blockManagerId` is null) - when this condition is hit. But that will get ignored since it is part of heartbeat.
   
   But the side effect now is that, we end up initializing the lazy val memoryManager in BlockManager ([here](https://github.com/apache/spark/blob/af68ece323401d06f7e036c3b0782bc80a0f4c93/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L668)).
   
   
   
   (I did not find driver logs for the test run - that would have helped validate this hypothesis - we would have seen "re-registering with master" for this test, before the NPE is thrown)
   
   Thoughts ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501739796


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   I will need to look more - I dont this my hypothesis is the RC here ... even though `_env.blockManager.initialize` happens after `_taskScheduler.start()`,  `_env.initializeMemoryManager` happens before and should have initialized the state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "sunchao (via GitHub)" <gi...@apache.org>.
sunchao commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501742134


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   Sorry, the line number may not match since I added several changes in my local repo for debugging purpose.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46947][CORE] Delay memory manager initialization until Driver plugin is loaded [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45052:
URL: https://github.com/apache/spark/pull/45052#discussion_r1501765989


##########
core/src/main/scala/org/apache/spark/storage/BlockManager.scala:
##########
@@ -177,15 +177,17 @@ private[spark] class HostLocalDirManager(
  * Manager running on every node (driver and executors) which provides interfaces for putting and
  * retrieving blocks both locally and remotely into various stores (memory, disk, and off-heap).
  *
- * Note that [[initialize()]] must be called before the BlockManager is usable.
+ * Note that [[initialize()]] must be called before the BlockManager is usable. Also, the
+ * `memoryManager` is initialized at a later stage after DriverPlugin is loaded, to allow the
+ * plugin to overwrite memory configurations.
  */
 private[spark] class BlockManager(
     val executorId: String,
     rpcEnv: RpcEnv,
     val master: BlockManagerMaster,
     val serializerManager: SerializerManager,
     val conf: SparkConf,
-    memoryManager: MemoryManager,
+    var memoryManager: MemoryManager,

Review Comment:
   @sunchao Can you please take a look [at this](https://github.com/apache/spark/compare/master...mridulm:spark:45052-suggestion-for-sunchao) ?
   It should fix the issue we are discussing - the test is for illustration purpose only, please do clean it up :-)
   
   Essentially it is a minor modification to your fix - the issue is that `blockManager` referenced in task cleanup in finally is incorrect - as you had fixed.
   The only change I introduced is to not need this to be passed in at a Task level - but simply grab it at task start time - and also added a test which validates this is indeed the issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org