You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by bbossy <gi...@git.apache.org> on 2016/02/15 18:27:46 UTC

[GitHub] spark pull request: [SPARK-12583][Mesos] Fix mesos shuffle service

Github user bbossy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11207#discussion_r52925163
  
    --- Diff: core/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala ---
    @@ -67,33 +76,162 @@ private[mesos] class MesosExternalShuffleBlockHandler(transportConf: TransportCo
        */
       override def channelInactive(client: TransportClient): Unit = {
         val address = client.getSocketAddress
    -    if (connectedApps.contains(address)) {
    -      val appId = connectedApps(address)
    -      logInfo(s"Application $appId disconnected (address was $address).")
    -      applicationRemoved(appId, true /* cleanupLocalDirs */)
    -      connectedApps.remove(address)
    -    } else {
    -      logWarning(s"Unknown $address disconnected.")
    -    }
    +    logInfo(s"Socket disconnected (address was $address).")
       }
     
       /** An extractor object for matching [[RegisterDriver]] message. */
       private object RegisterDriverParam {
         def unapply(r: RegisterDriver): Option[String] = Some(r.getAppId)
       }
    +
    +  private class MesosFrameworkCleaner extends Runnable {
    +
    +    // relevant if Mesos is running in HA mode with zookeeper
    +    private var mesosHaMode = sparkMaster.toLowerCase().startsWith("mesos://zk://")
    +
    +    // The Zookeeper URI if mesos is running in HA mode
    +    // (e.g. zk://zk1:port1,zk2:port2,zk3:port3/mesos)
    +    private var zkUri = if (!mesosHaMode) {
    +      None
    +    } else {
    +      Some(sparkMaster.toLowerCase().stripPrefix("mesos://"))
    +    }
    +
    +    // The currently known mesos leader.
    +    private var mesosLeader: String = if (!mesosHaMode) {
    +      // configured as non-HA. Verify:
    +      val sparkMasterUri = sparkMaster.stripPrefix("mesos://")
    +      getMasterStateObj(sparkMasterUri) match {
    +        case None =>
    +          logError(s"Unable to retrieve mesos state on start-up from $sparkMaster (non-HA " +
    +            s"configuration). Verify that spark.master points to a running mesos master and " +
    +            s"restart the shuffle service.")
    +          System.exit(-1)
    +          sparkMasterUri
    +        case Some(stateObj) =>
    +          getZkFromStateObj(stateObj) match {
    +            case Some(zk) =>
    +              logWarning(s"Shuffle service was started with a non-HA master ($sparkMaster) but a " +
    +                s"HA configuration was detected. Reconfiguring shuffle service to use " +
    +                s"'mesos://$zk' as 'spark.master'. You might want to fix your configuration.")
    +              mesosHaMode = true
    +              zkUri = Some(zk)
    +              getLeaderFromZk(zkUri.get)
    +            case None =>
    +              // Started as non-HA. Detected non-HA.
    +              sparkMasterUri
    +          }
    +      }
    +    } else {
    +      getLeaderFromZk(zkUri.get)
    +    }
    +
    +    lazy val objectMapper = new ObjectMapper()
    +
    +
    +    private def getLeaderFromZk(zkUri: String): String = {
    +      // this throws "java.lang.RuntimeException: Nonzero exit value: 255"
    +      // if the leader can't be determined within a timeout (5 seconds)
    +      val leaderFromZk = (s"mesos-resolve ${zkUri}" !!).stripLineEnd
    +      logTrace(s"Retrieved mesos leader $leaderFromZk from Zookeeper.")
    +      leaderFromZk
    +    }
    +
    +    private def getMasterStateObj(master: String): Option[JsonNode] = {
    +      val stateUrl = new URL(s"http://${master}/master/state.json")
    +      try {
    +        val conn = stateUrl.openConnection().asInstanceOf[HttpURLConnection]
    +        conn.setRequestMethod("GET")
    +        conn.setConnectTimeout(5000) // 5 secs
    +        if (200 == conn.getResponseCode) {
    +          Some(objectMapper.readTree(conn.getInputStream))
    +        } else {
    +          None
    +        }
    +      } catch {
    +        case _: SocketTimeoutException =>
    +          logError(s"Connection to mesos leader at $stateUrl timed out.")
    +          None
    +      }
    +    }
    +
    +    private def getLeaderFromStateObj(stateObj: JsonNode): Option[String] = {
    +      if (stateObj.has("leader")) {
    +        Some(stateObj.get("leader").asText().stripPrefix("master@"))
    +      } else {
    +        None
    +      }
    +    }
    +
    +    private def getRunningFrameworks(stateObj: JsonNode): Set[String] = {
    +      stateObj.get("frameworks").elements().asScala
    +        .map(_.get("id").asText()).toSet
    +    }
    +
    +    private def getZkFromStateObj(stateObj: JsonNode): Option[String] = {
    +      val flags = stateObj.get("flags")
    +      if (flags.has("zk")) {
    +        Some(flags.get("zk").asText())
    +      } else {
    +        None
    +      }
    +    }
    +
    +    override def run(): Unit = {
    +      getMasterStateObj(mesosLeader) match {
    +        case None =>
    +          if (mesosHaMode) {
    +            mesosLeader = getLeaderFromZk(zkUri.get)
    +            logInfo(s"Failed to retrieve mesos state, but found a new leader: $mesosLeader. " +
    +              s"Will retry.")
    +          } else {
    +            logError("Failed to retrieve mesos (non-HA) state.")
    +          }
    +        case Some(state) =>
    +          getLeaderFromStateObj(state) match {
    +            case None => logError("Failed to determine mesos leader from state.json")
    +            case Some(leader) =>
    +              if (leader != mesosLeader) {
    +                logInfo(s"Got a new leader ($leader) from state.json. Will retry with the new " +
    +                  s"leader.")
    +                mesosLeader = leader
    +              } else {
    +                // definitely got the state from the leader
    +                val runningFrameworks = getRunningFrameworks(state)
    +                val now = System.nanoTime()
    +                runningFrameworks.foreach { id =>
    +                  if (connectedApps.containsKey(id)) {
    +                    connectedApps.replace(id, now)
    +                  }
    +                }
    +                connectedApps.asScala.foreach { case (appId, lastSeen) =>
    +                  if (now - lastSeen > frameworkTimeoutMs * 1000 * 1000) {
    +                    logInfo(s"Application $appId has timed out. Removing shuffle files.")
    +                    applicationRemoved(appId, true)
    +                    connectedApps.remove(appId)
    +                  }
    +                }
    +              }
    +          }
    +      }
    +    }
    +  }
     }
     
     /**
      * A wrapper of [[ExternalShuffleService]] that provides an additional endpoint for drivers
    - * to associate with. This allows the shuffle service to detect when a driver is terminated
    - * and can clean up the associated shuffle files.
    + * to register with. This allows the shuffle service to detect when a mesos framework is no longer
    + * running and can clean up the associated shuffle files after a timeout.
      */
     private[mesos] class MesosExternalShuffleService(conf: SparkConf, securityManager: SecurityManager)
       extends ExternalShuffleService(conf, securityManager) {
     
       protected override def newShuffleBlockHandler(
           conf: TransportConf): ExternalShuffleBlockHandler = {
    -    new MesosExternalShuffleBlockHandler(conf)
    +    new MesosExternalShuffleBlockHandler(
    +      conf,
    +      this.conf.get("spark.master"),
    +      this.conf.getTimeAsMs("spark.storage.blockManagerSlaveTimeoutMs", "120s"))
    --- End diff --
    
    Should I use a different config key here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org