You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/02/12 18:07:02 UTC

[GitHub] vanzin commented on a change in pull request #23706: [SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve

vanzin commented on a change in pull request #23706: [SPARK-26790][CORE] Change approach for retrieving executor logs and attributes: self-retrieve
URL: https://github.com/apache/spark/pull/23706#discussion_r256078306
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/executor/BaseCoarseGrainedExecutorBackend.scala
 ##########
 @@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.executor
+
+import java.net.URL
+import java.nio.ByteBuffer
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicBoolean
+
+import scala.util.{Failure, Success}
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.TaskState.TaskState
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.worker.WorkerWatcher
+import org.apache.spark.executor.CoarseGrainedExecutorBackend.log
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
+import org.apache.spark.scheduler.{ExecutorLossReason, TaskDescription}
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
+import org.apache.spark.serializer.SerializerInstance
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+private[spark] abstract class BaseCoarseGrainedExecutorBackend(
+    override val rpcEnv: RpcEnv,
+    driverUrl: String,
+    executorId: String,
+    hostname: String,
+    cores: Int,
+    userClassPath: Seq[URL],
+    env: SparkEnv)
+  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
+
+  private[this] val stopping = new AtomicBoolean(false)
+  var executor: Executor = null
+  @volatile var driver: Option[RpcEndpointRef] = None
+
+  // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
+  // to be changed so that we don't share the serializer instance across threads
+  private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()
+
+  override def onStart() {
+    logInfo("Connecting to driver: " + driverUrl)
+    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
+      driver = Some(ref)
+      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
+        extractAttributes))
+    }(ThreadUtils.sameThread).onComplete {
+      // This is a very fast action so we can use "ThreadUtils.sameThread"
+      case Success(msg) =>
+      // Always receive `true`. Just ignore it
+      case Failure(e) =>
+        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
+    }(ThreadUtils.sameThread)
+  }
+
+  override def receive: PartialFunction[Any, Unit] = {
+    case RegisteredExecutor =>
+      logInfo("Successfully registered with driver")
+      try {
+        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
+      } catch {
+        case NonFatal(e) =>
+          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
+      }
+
+    case RegisterExecutorFailed(message) =>
+      exitExecutor(1, "Slave registration failed: " + message)
+
+    case LaunchTask(data) =>
+      if (executor == null) {
+        exitExecutor(1, "Received LaunchTask command but executor was null")
+      } else {
+        val taskDesc = TaskDescription.decode(data.value)
+        logInfo("Got assigned task " + taskDesc.taskId)
+        executor.launchTask(this, taskDesc)
+      }
+
+    case KillTask(taskId, _, interruptThread, reason) =>
+      if (executor == null) {
+        exitExecutor(1, "Received KillTask command but executor was null")
+      } else {
+        executor.killTask(taskId, interruptThread, reason)
+      }
+
+    case StopExecutor =>
+      stopping.set(true)
+      logInfo("Driver commanded a shutdown")
+      // Cannot shutdown here because an ack may need to be sent back to the caller. So send
+      // a message to self to actually do the shutdown.
+      self.send(Shutdown)
+
+    case Shutdown =>
+      stopping.set(true)
+      new Thread("CoarseGrainedExecutorBackend-stop-executor") {
+        override def run(): Unit = {
+          // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
+          // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
+          // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
+          // Therefore, we put this line in a new thread.
+          executor.stop()
+        }
+      }.start()
+
+    case UpdateDelegationTokens(tokenBytes) =>
+      logInfo(s"Received tokens of ${tokenBytes.length} bytes")
+      SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
+  }
+
+  override def onDisconnected(remoteAddress: RpcAddress): Unit = {
+    if (stopping.get()) {
+      logInfo(s"Driver from $remoteAddress disconnected during shutdown")
+    } else if (driver.exists(_.address == remoteAddress)) {
+      exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null,
+        notifyDriver = false)
+    } else {
+      logWarning(s"An unknown ($remoteAddress) driver disconnected.")
+    }
+  }
+
+  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
+    val msg = StatusUpdate(executorId, taskId, state, data)
+    driver match {
+      case Some(driverRef) => driverRef.send(msg)
+      case None => logWarning(s"Drop $msg because has not yet connected to driver")
+    }
+  }
+
+  def extractLogUrls: Map[String, String]
+
+  def extractAttributes: Map[String, String]
+
+  /**
+   * This function can be overloaded by other child classes to handle
+   * executor exits differently. For e.g. when an executor goes down,
+   * back-end may not want to take the parent process down.
+   */
+  protected def exitExecutor(
+      code: Int,
+      reason: String,
+      throwable: Throwable = null,
+      notifyDriver: Boolean = true) = {
+    val message = "Executor self-exiting due to : " + reason
+    if (throwable != null) {
+      logError(message, throwable)
+    } else {
+      logError(message)
+    }
+
+    if (notifyDriver && driver.nonEmpty) {
+      driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
+    }
+
+    System.exit(code)
+  }
+}
+
+object BaseCoarseGrainedExecutorBackend {
+  private[spark] def run(
 
 Review comment:
   Do you actually need different command line parsing for the YARN version? Up to now they've been the same, so it seems to me they should remain the same.
   
   So if instead of this, you add `main(args, backendCreateFn)` to `CoarseGrainedExecutorBackend`, you could share more code.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org