You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "xukun (JIRA)" <ji...@apache.org> on 2016/09/18 08:57:20 UTC

[jira] [Updated] (SPARK-17582) Error Executor info in SparkUI ExecutorsTab

     [ https://issues.apache.org/jira/browse/SPARK-17582?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

xukun updated SPARK-17582:
--------------------------
    Description: 
When executor is losted, SparkUI ExecutorsTab still show its executor info.

 class HeartbeatReceiver.scala
```
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

    // Messages sent and received locally
    case ExecutorRegistered(executorId) =>
      executorLastSeen(executorId) = clock.getTimeMillis()
      context.reply(true)
    case ExecutorRemoved(executorId) =>
      executorLastSeen.remove(executorId)
      context.reply(true)
    case TaskSchedulerIsSet =>
      scheduler = sc.taskScheduler
      context.reply(true)
    case ExpireDeadHosts =>
      expireDeadHosts()
      context.reply(true)

    // Messages received from executors
    case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
      if (scheduler != null) {
        if (executorLastSeen.contains(executorId)) {
          executorLastSeen(executorId) = clock.getTimeMillis()
          eventLoopThread.submit(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              val unknownExecutor = !scheduler.executorHeartbeatReceived(
                executorId, taskMetrics, blockManagerId)
              val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
              context.reply(response)
            }
          })
        } else {
          // This may happen if we get an executor's in-flight heartbeat immediately
          // after we just removed it. It's not really an error condition so we should
          // not log warning here. Otherwise there may be a lot of noise especially if
          // we explicitly remove executors (SPARK-4134).
          logDebug(s"Received heartbeat from unknown executor $executorId")
          context.reply(HeartbeatResponse(reregisterBlockManager = false))
        }
      } else {
        // Because Executor will sleep several seconds before sending the first "Heartbeat", this
        // case rarely happens. However, if it really happens, log it and ask the executor to
        // register itself again.
        logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
      }
  }
```


HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
If the process like listed:
1. process HeartBeat and eventLoopThread not return result
 2.process ExecutorRemoved
variables unknownExecutor will  be true,it will lead to reregisterBlockManager.
The result is that executor is lost and blockManager is still alive.  



  was:
When executor is losted, SparkUI ExecutorsTab still show its executor info.

 class HeartbeatReceiver.scala
```
  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {

    // Messages sent and received locally
    case ExecutorRegistered(executorId) =>
      executorLastSeen(executorId) = clock.getTimeMillis()
      context.reply(true)
    case ExecutorRemoved(executorId) =>
      executorLastSeen.remove(executorId)
      context.reply(true)
    case TaskSchedulerIsSet =>
      scheduler = sc.taskScheduler
      context.reply(true)
    case ExpireDeadHosts =>
      expireDeadHosts()
      context.reply(true)

    // Messages received from executors
    case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
      if (scheduler != null) {
        if (executorLastSeen.contains(executorId)) {
          executorLastSeen(executorId) = clock.getTimeMillis()
          eventLoopThread.submit(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              val unknownExecutor = !scheduler.executorHeartbeatReceived(
                executorId, taskMetrics, blockManagerId)
              val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
              context.reply(response)
            }
          })
        } else {
          // This may happen if we get an executor's in-flight heartbeat immediately
          // after we just removed it. It's not really an error condition so we should
          // not log warning here. Otherwise there may be a lot of noise especially if
          // we explicitly remove executors (SPARK-4134).
          logDebug(s"Received heartbeat from unknown executor $executorId")
          context.reply(HeartbeatResponse(reregisterBlockManager = false))
        }
      } else {
        // Because Executor will sleep several seconds before sending the first "Heartbeat", this
        // case rarely happens. However, if it really happens, log it and ask the executor to
        // register itself again.
        logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
        context.reply(HeartbeatResponse(reregisterBlockManager = true))
      }
  }
```
HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
If the process like listed:
1. process HeartBeat and eventLoopThread not return result
 2.process ExecutorRemoved
variables unknownExecutor will  be true,it will lead to reregisterBlockManager.
The result is that executor is lost and blockManager is still alive.  




> Error Executor info in SparkUI ExecutorsTab
> -------------------------------------------
>
>                 Key: SPARK-17582
>                 URL: https://issues.apache.org/jira/browse/SPARK-17582
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: xukun
>            Priority: Minor
>
> When executor is losted, SparkUI ExecutorsTab still show its executor info.
>  class HeartbeatReceiver.scala
> ```
>   override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
>     // Messages sent and received locally
>     case ExecutorRegistered(executorId) =>
>       executorLastSeen(executorId) = clock.getTimeMillis()
>       context.reply(true)
>     case ExecutorRemoved(executorId) =>
>       executorLastSeen.remove(executorId)
>       context.reply(true)
>     case TaskSchedulerIsSet =>
>       scheduler = sc.taskScheduler
>       context.reply(true)
>     case ExpireDeadHosts =>
>       expireDeadHosts()
>       context.reply(true)
>     // Messages received from executors
>     case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
>       if (scheduler != null) {
>         if (executorLastSeen.contains(executorId)) {
>           executorLastSeen(executorId) = clock.getTimeMillis()
>           eventLoopThread.submit(new Runnable {
>             override def run(): Unit = Utils.tryLogNonFatalError {
>               val unknownExecutor = !scheduler.executorHeartbeatReceived(
>                 executorId, taskMetrics, blockManagerId)
>               val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
>               context.reply(response)
>             }
>           })
>         } else {
>           // This may happen if we get an executor's in-flight heartbeat immediately
>           // after we just removed it. It's not really an error condition so we should
>           // not log warning here. Otherwise there may be a lot of noise especially if
>           // we explicitly remove executors (SPARK-4134).
>           logDebug(s"Received heartbeat from unknown executor $executorId")
>           context.reply(HeartbeatResponse(reregisterBlockManager = false))
>         }
>       } else {
>         // Because Executor will sleep several seconds before sending the first "Heartbeat", this
>         // case rarely happens. However, if it really happens, log it and ask the executor to
>         // register itself again.
>         logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
>         context.reply(HeartbeatResponse(reregisterBlockManager = true))
>       }
>   }
> ```
> HeartbeatReceiver receive message:  Heartbeat and ExecutorRemoved; 
> If the process like listed:
> 1. process HeartBeat and eventLoopThread not return result
>  2.process ExecutorRemoved
> variables unknownExecutor will  be true,it will lead to reregisterBlockManager.
> The result is that executor is lost and blockManager is still alive.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org