You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/12/30 19:12:00 UTC

[GitHub] [spark] vanzin commented on a change in pull request #27010: [SPARK-30313][CORE] Ensure EndpointRef is available MasterWebUI/WorkerPage

vanzin commented on a change in pull request #27010: [SPARK-30313][CORE] Ensure EndpointRef is available MasterWebUI/WorkerPage
URL: https://github.com/apache/spark/pull/27010#discussion_r362071308
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
 ##########
 @@ -68,11 +82,10 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
       if (stopped) {
         throw new IllegalStateException("RpcEnv has been stopped")
       }
-      if (endpoints.putIfAbsent(name, getMessageLoop(name, endpoint)) != null) {
+      if (endpoints.putIfAbsent(name, assignToMessageLoop(name, endpoint, endpointRef)) != null) {
 
 Review comment:
   While this solves the issue, I don't think it's quite right. The error path here is wrong, because you'll modify `endpointRefs` and, more importantly, the message loop. (`assignToMessageLoop` mutates those, and is called here regardless of whether the endpoint should be registered.)
   
   To be fair, the previous code also has that problem w.r.t. the message loop being modified.
   
   I think it would be safe here to have something like:
   
   ```
   def findMessageLoop(endpoint) = {
     // return the right message loop without modification
   }
   
   val messageLoop = findMessageLoop(endpoint)
   if (endpoints.putIfAbsent(name, messageLoop) != null) {
     throw
   }
   endpointRefs.put(...)
   messageLoop.register(...)
   ```
   
   If done inside the synchronized loop that seems to be safe and solve the problem. `DedicatedMessageLoop` should also implement `register` and call `setActive` there, instead of as part of the constructor. To add another small thing, `DedicatedMessageLoop` will leak a thread pool here in the error case, so maybe the thread pool should also be created in the `register` implementation.
   
   In fact... since this is inside a synchronized block anyway, you can simplify some of the above by not using `putIfAbsent`. Just check with `containsKey`, throw if it already exists, then find the right loop, put it in `endpoints` and update `endpointRefs`, then call `register()`. You'll still need `DedicatedMessageLoop.register()` to call `setActive()` at the right time.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org