You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2014/08/18 23:10:25 UTC

[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/2020

    [SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.

    This change modifies the Yarn module so that all the logic related
    to running the ApplicationMaster is localized. Instead of, previously,
    4 different classes with mostly identical code, now we have:
    
    - A single, shared ApplicationMaster class, which can operate both in
      client and cluster mode, and substitutes the old ApplicationMaster
      (for cluster mode) and ExecutorLauncher (for client mode).
    
    The benefit here is that all different execution modes for all supported
    yarn versions use the same shared code for monitoring executor allocation,
    setting up configuration, and monitoring the process's lifecycle.
    
    - A new YarnRMClient interface, which defines basic RM functionality needed
      by the ApplicationMaster. This interface has concrete implementations for
      each supported Yarn version.
    
    - A new YarnAllocator interface, which just abstracts the existing interface
      of the YarnAllocationHandler class. This is to avoid having to touch the
      allocator code too much in this change, although it might benefit from a
      similar effort in the future.
    
    The end result is much easier to understand code, with much less duplication,
    making it much easier to fix bugs, add features, and test everything knowing
    that all supported versions will behave the same.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-2933

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2020
    
----
commit 3630f1e8d02ad4d77ce56390b52bad8bbbcd4691
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-08T20:51:25Z

    [SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.
    
    This change modifies the Yarn module so that all the logic related
    to running the ApplicationMaster is localized. Instead of, previously,
    4 different classes with mostly identical code, now we have:
    
    - A single, shared ApplicationMaster class, which can operate both in
      client and cluster mode, and substitutes the old ApplicationMaster
      (for cluster mode) and ExecutorLauncher (for client mode).
    
    The benefit here is that all different execution modes for all supported
    yarn versions use the same shared code for monitoring executor allocation,
    setting up configuration, and monitoring the process's lifecycle.
    
    - A new YarnRMClient interface, which defines basic RM functionality needed
      by the ApplicationMaster. This interface has concrete implementations for
      each supported Yarn version.
    
    - A new YarnAllocator interface, which just abstracts the existing interface
      of the YarnAllocationHandler class. This is to avoid having to touch the
      allocator code too much in this change, although it might benefit from a
      similar effort in the future.
    
    The end result is much easier to understand code, with much less duplication,
    making it much easier to fix bugs, add features, and test everything knowing
    that all supported versions will behave the same.

commit c00be0d5f31fed394ccb41e62fc8aeb614a57e0b
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-14T22:28:22Z

    Changes to the yarn-alpha project to use common AM code.
    
    Made some tweaks to the YarnAllocator interface to cover both
    APIs more easily. There's still a lot of cleanup possible on
    that front, but I'll leave that as a separate task.

commit cec283a831b856f505f1c20a7e4e7c869808b641
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-14T23:15:46Z

    Trivial cleanups.

commit a694d0823f383a68d58813f6e33bcaebb87882fe
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T00:43:33Z

    Fix UI filter registration.

commit e73d00e0415a31d3785964b958c101e49e466406
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T01:45:44Z

    Keep "ExecutorLauncher" as the main class for client-mode AM.

commit fd699fd9449e0e7c6a4ec06b4260bb11c093efe2
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T02:09:37Z

    Finish app if SparkContext initialization times out.
    
    This avoids the NPEs that would happen if code just kept going.

commit 6145a986564e12afa028c066dde4042057d74447
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T02:20:19Z

    Fix some questionable error handling.

commit f3eb8dc4ed8ba33f2b24b0211067952af3c2cfbb
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T02:30:17Z

    More trivial cleanup.

commit ccee155a81c5f6627c45f11b5c36f0b90b00dfac
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-15T02:39:01Z

    Move cluster/client code to separate methods.
    
    Makes code a little cleaner and easier to follow.

commit 6a7b07a11ca5aab21bfb4e245e514e9b5f7925b9
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-18T18:02:04Z

    Some more cleanup.

commit 30968c0f61ec3c49e293f1529973c4e1db5b93b6
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-18T18:17:30Z

    Restore shutdown hook to clean up staging dir.

commit 5100474aa46627e345951977d48e178ea793850f
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2014-08-18T18:26:09Z

    Cleanup a couple more constants.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16501006
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +  private val userResult = new AtomicBoolean(false)
    +
    +  final def run(): Unit = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    // If this is the last attempt, register a shutdown hook to cleanup the staging dir
    +    // after the app is finished, in case it does not exit through the expected means.
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    if (isLastAttempt()) {
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          logInfo("AppMaster received a signal.")
    +          if (!finished) {
    +            cleanupStagingDir()
    +          }
    +        }
    +      }
    +      ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +    }
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    val success =
    +      try {
    +        if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
    +      } catch {
    +        case e: Exception =>
    +          logError("Exception while running AM main loop.", e)
    +          false
    +      }
    +
    +    finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
    +    val shouldCleanup = success || isLastAttempt()
    +    if (shouldCleanup) {
    +      cleanupStagingDir()
    --- End diff --
    
    I see. I may need to make adjustments elsewhere in the code, so just to make sure: Yarn will only re-run the AM when it exits with a non-zero status, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16426461
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import scala.collection.{Map, Set}
    +
    +import org.apache.hadoop.net.NetUtils
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +import org.apache.hadoop.yarn.ipc.YarnRPC
    +import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.scheduler.SplitInfo
    +import org.apache.spark.util.Utils
    +
    +/**
    + * YarnRMClient implementation for the Yarn alpha API.
    --- End diff --
    
    I'll add that to the interface.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16730231
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    --- End diff --
    
    Hmmm. Works fine in stable, and interrupting "self" should be ok in general. But I guess it's safer to not do it (and avoid the sleep in the reporter thread when the AM is finished).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16497648
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +  private val userResult = new AtomicBoolean(false)
    +
    +  final def run(): Unit = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    // If this is the last attempt, register a shutdown hook to cleanup the staging dir
    +    // after the app is finished, in case it does not exit through the expected means.
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    if (isLastAttempt()) {
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          logInfo("AppMaster received a signal.")
    +          if (!finished) {
    +            cleanupStagingDir()
    +          }
    +        }
    +      }
    +      ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +    }
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    val success =
    +      try {
    +        if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
    +      } catch {
    +        case e: Exception =>
    +          logError("Exception while running AM main loop.", e)
    +          false
    +      }
    +
    +    finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
    +    val shouldCleanup = success || isLastAttempt()
    +    if (shouldCleanup) {
    +      cleanupStagingDir()
    --- End diff --
    
    This isn't quite right. We actually want to clean the staging directory if the final status is failed also.  The only time we don't want to clean it is if the AM is going to get rerun.   We have the shutdown hook in place we could change it to be there all the time instead of just when isLastAttempt() and then it would handle the cleanup.
    
    I just found there is a bug with this in the existing code where it doesn't clean up properly that I might put up Pr to fix in branch-1.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16729816
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    --- End diff --
    
    This is being called from the reporterThread in the case of checkNumExecutorsFailed. So interrupting that thread then causes it to not finish cleaning up.  Like setting diagnostics message and calling sc.stop().
    
    On hadoop 0.23 this results in the application hanging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52917400
  
    @vanzin yes I believe you are correct, in most cases it just passes succeeded.  We really need to bring client up to par but I was hoping to move it to unmanaged AM so things would use more of the same code path.
    
    So I would like to get this in soon to minimize pain of you having to keep upmerging and me keep reviewing.  I'm thinking about just putting this into master since the 1.1 version is pretty close.  Any objections?     


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53596677
  
    Ok this looks good. +1.  thanks for taking this on @vanzin !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16421159
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala ---
    @@ -0,0 +1,103 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import scala.collection.{Map, Set}
    +
    +import org.apache.hadoop.net.NetUtils
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.api.protocolrecords._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +import org.apache.hadoop.yarn.ipc.YarnRPC
    +import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
    +
    +import org.apache.spark.{Logging, SparkConf}
    +import org.apache.spark.scheduler.SplitInfo
    +import org.apache.spark.util.Utils
    +
    +/**
    + * YarnRMClient implementation for the Yarn alpha API.
    --- End diff --
    
    might be nice to add a bit more description.  Like contains base functionality for Spark application master to talk to the Yarn ResourceManager.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53597239
  
    I committed this just to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16680925
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    +      try {
    +        if (Thread.currentThread() != reporterThread) {
    +          reporterThread.join()
    +        }
    +      } finally {
    +        client.shutdown(status, Option(diagnostics).getOrElse(""))
    +      }
    +    }
    +  }
    +
    +  private def sparkContextInitialized(sc: SparkContext) = {
    +    sparkContextRef.synchronized {
    +      sparkContextRef.compareAndSet(null, sc)
    +      sparkContextRef.notifyAll()
    +    }
    +  }
    +
    +  private def sparkContextStopped(sc: SparkContext) = {
    +    sparkContextRef.compareAndSet(sc, null)
    +  }
    +
    +  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
    +    val sc = sparkContextRef.get()
    +    allocator = client.register(yarnConf,
    +      if (sc != null) sc.getConf else sparkConf,
    +      if (sc != null) sc.preferredNodeLocationData else Map(),
    +      uiAddress,
    +      uiHistoryAddress)
    +
    +    allocator.allocateResources()
    +    reporterThread = launchReporterThread()
    +  }
    +
    +  private def runDriver(): Unit = {
    +    addAmIpFilter()
    +    val userThread = startUserClass()
    +
    +    // This a bit hacky, but we need to wait until the spark.driver.port property has
    +    // been set by the Thread executing the user class.
    +    val sc = waitForSparkContextInitialized()
    +
    +    // If there is no SparkContext at this point, just fail the app.
    +    if (sc == null) {
    +      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
    +    } else {
    +      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
    +      try {
    +        userThread.join()
    +      } finally {
    +        // In cluster mode, ask the reporter thread to stop since the user app is finished.
    +        reporterThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    +    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
    +      conf = sparkConf, securityManager = securityMgr)._1
    +    actor = waitForSparkDriver()
    +    addAmIpFilter()
    +    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), "")
    --- End diff --
    
    Hmm, I think I overlooked this one. Anyway the code in master was sort of doing the wrong thing, I fixed it now (and tested).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52572591
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18793/consoleFull) for   PR 2020 at commit [`557fdeb`](https://github.com/apache/spark/commit/557fdeba535d4517bb79502240cfddab561a35c3).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52564236
  
    No, I don't believe I changed anything in that regard. Goal was to keep all the current functionality intact.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52941383
  
    Yes, absolutely, I wasn't plannning to get this into 1.1 since I might have introduced some subtle bugs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16732235
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    --- End diff --
    
    fyi - The way I tested this was to use jdk32 but try to allocate executors with >4G memory.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52853573
  
    @tgravescs I think I fixed the staging dir cleanup now. There is still an (unrelated) issue in the code - in client mode, the app's status is not propagated all the way to the RM as far as I can see, so it always ends up being "SUCCEEDED". But I think that was the case before too. That should be easy to fix but better done in a separate change (I think there's even a bug open for it?).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52566977
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18793/consoleFull) for   PR 2020 at commit [`557fdeb`](https://github.com/apache/spark/commit/557fdeba535d4517bb79502240cfddab561a35c3).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52954226
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19061/consoleFull) for   PR 2020 at commit [`41f8c8a`](https://github.com/apache/spark/commit/41f8c8a2ef588c1ee90c58189d0c8892c88251e9).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52555930
  
    Man, I just merged with master and there's already a conflict... will update shortly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53332353
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19150/consoleFull) for   PR 2020 at commit [`0f5142c`](https://github.com/apache/spark/commit/0f5142cc8ae00cde0ac96c09ae28c528d316823f).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52564053
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18783/consoleFull) for   PR 2020 at commit [`5100474`](https://github.com/apache/spark/commit/5100474aa46627e345951977d48e178ea793850f).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16420128
  
    --- Diff: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala ---
    @@ -59,16 +50,14 @@ object AllocationType extends Enumeration {
      * Acquires resources for executors from a ResourceManager and launches executors in new containers.
      */
     private[yarn] class YarnAllocationHandler(
    -    val conf: Configuration,
    -    val resourceManager: AMRMProtocol,
    -    val appAttemptId: ApplicationAttemptId,
    -    val maxExecutors: Int,
    -    val executorMemory: Int,
    -    val executorCores: Int,
    -    val preferredHostToCount: Map[String, Int],
    -    val preferredRackToCount: Map[String, Int],
    -    val sparkConf: SparkConf)
    -  extends Logging {
    +    conf: Configuration,
    +    sparkConf: SparkConf,
    +    resourceManager: AMRMProtocol,
    +    appAttemptId: ApplicationAttemptId,
    +    args: ApplicationMasterArguments,
    +    map: collection.Map[String, collection.Set[SplitInfo]])
    --- End diff --
    
    It might be nice to have more descriptive name then map.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16430079
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -397,22 +390,37 @@ trait ClientBase extends Logging {
             .foreach(p => javaOpts += s"-Djava.library.path=$p")
         }
     
    -    // Command for the ApplicationMaster
    -    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
    -      javaOpts ++
    -      Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
    -        "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
    -        userArgsToString(args),
    -        "--executor-memory", args.executorMemory.toString,
    +    val userClass =
    +      if (args.userClass != null) {
    +        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
    +      } else {
    +        Nil
    +      }
    +    val amClass =
    +      if (isLaunchingDriver) {
    +        classOf[ApplicationMaster].getName()
    +      } else {
    +        classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
    --- End diff --
    
    isn't the ExecutorLauncher class gone now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16549583
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      reporterThread.interrupt()
    +      try {
    +        if (Thread.currentThread() != reporterThread) {
    +          reporterThread.join()
    +        }
    +      } finally {
    +        client.shutdown(status, Option(diagnostics).getOrElse(""))
    +      }
    +    }
    +  }
    +
    +  private def sparkContextInitialized(sc: SparkContext) = {
    +    sparkContextRef.synchronized {
    +      sparkContextRef.compareAndSet(null, sc)
    +      sparkContextRef.notifyAll()
    +    }
    +  }
    +
    +  private def sparkContextStopped(sc: SparkContext) = {
    +    sparkContextRef.compareAndSet(sc, null)
    +  }
    +
    +  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
    +    val sc = sparkContextRef.get()
    +    allocator = client.register(yarnConf,
    +      if (sc != null) sc.getConf else sparkConf,
    +      if (sc != null) sc.preferredNodeLocationData else Map(),
    +      uiAddress,
    +      uiHistoryAddress)
    +
    +    allocator.allocateResources()
    +    reporterThread = launchReporterThread()
    +  }
    +
    +  private def runDriver(): Unit = {
    +    addAmIpFilter()
    +    val userThread = startUserClass()
    +
    +    // This a bit hacky, but we need to wait until the spark.driver.port property has
    +    // been set by the Thread executing the user class.
    +    val sc = waitForSparkContextInitialized()
    +
    +    // If there is no SparkContext at this point, just fail the app.
    +    if (sc == null) {
    +      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
    +    } else {
    +      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
    +      try {
    +        userThread.join()
    +      } finally {
    +        // In cluster mode, ask the reporter thread to stop since the user app is finished.
    +        reporterThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    +    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
    +      conf = sparkConf, securityManager = securityMgr)._1
    +    actor = waitForSparkDriver()
    +    addAmIpFilter()
    +    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), "")
    +
    +    // In client mode the actor will stop the reporter thread.
    +    reporterThread.join()
    +    finalStatus = FinalApplicationStatus.SUCCEEDED
    +  }
    +
    +  private def launchReporterThread(): Thread = {
    +    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
    +    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
    +
    +    // we want to be reasonably responsive without causing too many requests to RM.
    +    val schedulerInterval =
    +      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
    +
    +    // must be <= expiryInterval / 2.
    +    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
    +
    +    val t = new Thread {
    +      override def run() {
    +        while (!finished) {
    +          checkNumExecutorsFailed()
    +          logDebug("Sending progress")
    +          allocator.allocateResources()
    +          try {
    +            Thread.sleep(interval)
    +          } catch {
    +            case e: InterruptedException =>
    +          }
    +        }
    +      }
    +    }
    +    // setting to daemon status, though this is usually not a good idea.
    +    t.setDaemon(true)
    +    t.setName("Reporter")
    +    t.start()
    +    logInfo("Started progress reporter thread - sleep time : " + interval)
    +    t
    +  }
    +
    +  /**
    +   * Clean up the staging directory.
    +   */
    +  private def cleanupStagingDir() {
    +    val fs = FileSystem.get(yarnConf)
    +    var stagingDirPath: Path = null
    +    try {
    +      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
    +      if (!preserveFiles) {
    +        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
    +        if (stagingDirPath == null) {
    +          logError("Staging directory is null")
    +          return
    +        }
    +        logInfo("Deleting staging directory " + stagingDirPath)
    +        fs.delete(stagingDirPath, true)
    +      }
    +    } catch {
    +      case ioe: IOException =>
    +        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
    +    }
    +  }
    +
    +  // Note: this needs to happen before allocateExecutors.
    +  private def waitForSparkContextInitialized(): SparkContext = {
    +    logInfo("Waiting for spark context initialization")
    +    try {
    +      sparkContextRef.synchronized {
    +        var count = 0
    +        val waitTime = 10000L
    +        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
    +        while (sparkContextRef.get() == null && count < numTries && !finished) {
    +          logInfo("Waiting for spark context initialization ... " + count)
    +          count = count + 1
    +          sparkContextRef.wait(waitTime)
    +        }
    +
    +        val sparkContext = sparkContextRef.get()
    +        assert(sparkContext != null || count >= numTries)
    +        if (sparkContext == null) {
    +          logError(
    +            "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
    +              count * waitTime, numTries))
    +        }
    +        sparkContext
    +      }
    +    }
    +  }
    +
    +  private def waitForSparkDriver(): ActorRef = {
    +    logInfo("Waiting for Spark driver to be reachable.")
    +    var driverUp = false
    +    val hostport = args.userArgs(0)
    +    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    +    while (!driverUp) {
    +      try {
    +        val socket = new Socket(driverHost, driverPort)
    +        socket.close()
    +        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
    +        driverUp = true
    +      } catch {
    +        case e: Exception =>
    +          logError("Failed to connect to driver at %s:%s, retrying ...".
    +            format(driverHost, driverPort))
    +          Thread.sleep(100)
    +      }
    +    }
    +    sparkConf.set("spark.driver.host", driverHost)
    +    sparkConf.set("spark.driver.port", driverPort.toString)
    +
    +    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
    +      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
    +  }
    +
    +  private def checkNumExecutorsFailed() = {
    +    if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
    +      finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
    +
    +      val sc = sparkContextRef.get()
    +      if (sc != null) {
    +        logInfo("Invoking sc stop from checkNumExecutorsFailed")
    +        sc.stop()
    +      }
    +    }
    +  }
    +
    +  /** Add the Yarn IP filter that is required for properly securing the UI. */
    +  private def addAmIpFilter() = {
    +    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
    +    val proxy = client.getProxyHostAndPort(yarnConf)
    +    val parts = proxy.split(":")
    +    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
    +    val uriBase = "http://" + proxy + proxyBase
    +    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
    +
    +    if (isDriver) {
    +      System.setProperty("spark.ui.filters", amFilter)
    +      System.setProperty(s"spark.$amFilter.params", params)
    +    } else {
    +      actor ! AddWebUIFilter(amFilter, params, proxyBase)
    +    }
    +  }
    +
    +  private def startUserClass(): Thread = {
    +    logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    val mainMethod = Class.forName(args.userClass, false,
    +      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
    +
    +    val t = new Thread {
    +      override def run() {
    +        try {
    +          // Copy
    +          val mainArgs = new Array[String](args.userArgs.size)
    +          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
    +          mainMethod.invoke(null, mainArgs)
    +          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
    +          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
    +          finalStatus = FinalApplicationStatus.SUCCEEDED
    +        } finally {
    +          logDebug("Finishing main")
    --- End diff --
    
    Yes, I removed too much code (just looked at the original).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52859036
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19005/consoleFull) for   PR 2020 at commit [`c0794be`](https://github.com/apache/spark/commit/c0794befcd6ae6e802fe432f46f163ebdd0f549a).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53478339
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19232/consoleFull) for   PR 2020 at commit [`ff389ed`](https://github.com/apache/spark/commit/ff389ed2629309e8b97f22dd74571e32a0f482d4).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16684794
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,426 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    +      try {
    +        if (Thread.currentThread() != reporterThread) {
    +          reporterThread.join()
    +        }
    +      } finally {
    +        client.shutdown(status, Option(diagnostics).getOrElse(""))
    +      }
    +    }
    +  }
    +
    +  private def sparkContextInitialized(sc: SparkContext) = {
    +    sparkContextRef.synchronized {
    +      sparkContextRef.compareAndSet(null, sc)
    +      sparkContextRef.notifyAll()
    +    }
    +  }
    +
    +  private def sparkContextStopped(sc: SparkContext) = {
    +    sparkContextRef.compareAndSet(sc, null)
    +  }
    +
    +  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
    +    val sc = sparkContextRef.get()
    +    allocator = client.register(yarnConf,
    +      if (sc != null) sc.getConf else sparkConf,
    +      if (sc != null) sc.preferredNodeLocationData else Map(),
    +      uiAddress,
    +      uiHistoryAddress)
    +
    +    allocator.allocateResources()
    +    reporterThread = launchReporterThread()
    +  }
    +
    +  private def runDriver(): Unit = {
    +    addAmIpFilter()
    +    val userThread = startUserClass()
    +
    +    // This a bit hacky, but we need to wait until the spark.driver.port property has
    +    // been set by the Thread executing the user class.
    +    val sc = waitForSparkContextInitialized()
    +
    +    // If there is no SparkContext at this point, just fail the app.
    +    if (sc == null) {
    +      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
    +    } else {
    +      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
    +      try {
    +        userThread.join()
    +      } finally {
    +        // In cluster mode, ask the reporter thread to stop since the user app is finished.
    +        reporterThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    +    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
    +      conf = sparkConf, securityManager = securityMgr)._1
    +    actor = waitForSparkDriver()
    +    addAmIpFilter()
    +    registerAM(sparkConf.get("spark.driver.appUIAddress", ""),
    +      sparkConf.get("spark.driver.appUIHistoryAddress", ""))
    +
    +    // In client mode the actor will stop the reporter thread.
    +    reporterThread.join()
    +    finalStatus = FinalApplicationStatus.SUCCEEDED
    +  }
    +
    +  private def launchReporterThread(): Thread = {
    +    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
    +    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
    +
    +    // we want to be reasonably responsive without causing too many requests to RM.
    +    val schedulerInterval =
    +      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
    +
    +    // must be <= expiryInterval / 2.
    +    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
    +
    +    val t = new Thread {
    +      override def run() {
    +        while (!finished) {
    +          checkNumExecutorsFailed()
    +          logDebug("Sending progress")
    +          allocator.allocateResources()
    +          try {
    +            Thread.sleep(interval)
    +          } catch {
    +            case e: InterruptedException =>
    +          }
    +        }
    +      }
    +    }
    +    // setting to daemon status, though this is usually not a good idea.
    +    t.setDaemon(true)
    +    t.setName("Reporter")
    +    t.start()
    +    logInfo("Started progress reporter thread - sleep time : " + interval)
    +    t
    +  }
    +
    +  /**
    +   * Clean up the staging directory.
    +   */
    +  private def cleanupStagingDir() {
    +    val fs = FileSystem.get(yarnConf)
    +    var stagingDirPath: Path = null
    +    try {
    +      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
    +      if (!preserveFiles) {
    +        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
    +        if (stagingDirPath == null) {
    +          logError("Staging directory is null")
    +          return
    +        }
    +        logInfo("Deleting staging directory " + stagingDirPath)
    +        fs.delete(stagingDirPath, true)
    +      }
    +    } catch {
    +      case ioe: IOException =>
    +        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
    +    }
    +  }
    +
    +  // Note: this needs to happen before allocateExecutors.
    --- End diff --
    
    nit, comment not valid anymore


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53342189
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19150/consoleFull) for   PR 2020 at commit [`0f5142c`](https://github.com/apache/spark/commit/0f5142cc8ae00cde0ac96c09ae28c528d316823f).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52821770
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18971/consoleFull) for   PR 2020 at commit [`92770cc`](https://github.com/apache/spark/commit/92770ccdf1d9585842c717fe86c8193403c21cc5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16420510
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -79,6 +84,21 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
     }
     
     object YarnSparkHadoopUtil {
    +  // Additional memory overhead - in mb.
    +  val DEFAULT_MEMORY_OVERHEAD = 384
    +
    +  val ANY_HOST = "*"
    +
    +  // All RM requests are issued with same priority : we do not (yet) have any distinction between
    +  // request types (like map/reduce in hadoop for example)
    +  val RM_REQUEST_PRIORITY = 1
    +
    +  // Host to rack map - saved from allocation requests. We are expecting this not to change.
    +  // Note that it is possible for this to change : and ResurceManager will indicate that to us via
    --- End diff --
    
    ResourceManager spelling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16676634
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala ---
    @@ -173,4 +193,35 @@ object YarnSparkHadoopUtil {
         }
       }
     
    +  def lookupRack(conf: Configuration, host: String): String = {
    --- End diff --
    
    I haven't convinced myself this stuff really belongs in this class.  I can kind of see it Hadoop related and we don't have another Util class so I think its fine for now.  I do think we want to make these routines private to spark though since this class isn't currently private and we don't want normal users calling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52556606
  
    I tried to keep the allocator changes minimal; a lot of it is just removing dead code and moving shared code to a shared place, but I did make some changes to make the alpha and stable interfaces work similarly.
    
    @tgravescs, it would be great if you guys could test the alpha changes; they compile, but I don't have a 0.23.x cluster to test them on. That would be greatly appreciated!
    
    I tested both client and cluster mode in stable yarn (CDH 5.1 = 2.3.0), including killing the AM and killing executors while jobs were running.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16430302
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala ---
    @@ -397,22 +390,37 @@ trait ClientBase extends Logging {
             .foreach(p => javaOpts += s"-Djava.library.path=$p")
         }
     
    -    // Command for the ApplicationMaster
    -    val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++
    -      javaOpts ++
    -      Seq(args.amClass, "--class", YarnSparkHadoopUtil.escapeForShell(args.userClass),
    -        "--jar ", YarnSparkHadoopUtil.escapeForShell(args.userJar),
    -        userArgsToString(args),
    -        "--executor-memory", args.executorMemory.toString,
    +    val userClass =
    +      if (args.userClass != null) {
    +        Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
    +      } else {
    +        Nil
    +      }
    +    val amClass =
    +      if (isLaunchingDriver) {
    +        classOf[ApplicationMaster].getName()
    +      } else {
    +        classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher")
    --- End diff --
    
    No, I left a dummy object in ApplicationMaster.scala just so that ps/jps show the same info as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16676173
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala ---
    @@ -0,0 +1,67 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import scala.collection.{Map, Set}
    +
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +import org.apache.hadoop.yarn.api.records._
    +
    +import org.apache.spark.{SparkConf, SparkContext}
    +import org.apache.spark.scheduler.SplitInfo
    +
    +/**
    + * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
    + * is used by Spark's AM.
    + */
    +trait YarnRMClient {
    +
    +  /**
    +   * Registers the application master with the RM.
    +   *
    +   * @param conf The Yarn configuration.
    +   * @param sparkConf The Spark configuration.
    +   * @param preferredNodeLocations Map with hints about where to allocate containers.
    +   * @param uiAddress Address of the SparkUI.
    +   * @param uiHistoryAddress Address of the application on the History Server.
    +   */
    +  def register(
    +      conf: YarnConfiguration,
    +      sparkConf: SparkConf,
    +      preferredNodeLocations: Map[String, Set[SplitInfo]],
    +      uiAddress: String,
    +      uiHistoryAddress: String): YarnAllocator
    +
    +  /**
    +   * Shuts down the AM. Guaranteed to only be called once.
    +   *
    +   * @param registered Whether the AM was successfully registered with the RM.
    --- End diff --
    
    can you fix the scaladoc (registered isn't a parameter)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16731434
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    --- End diff --
    
    I was testing on stable too and it didn't report the diagnostics properly and didn't do the sc stop.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52659606
  
    I took one pass through this and made some minor comment.  I also ran a few small tests on 0.23 and they worked ok.  
    
    I want to take another more detailed pass over this though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52566649
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16738078
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    --- End diff --
    
    Fixed and re-tested (and re-merged). I generally run this in a loop to try this out:
    
        jps | grep CoarseGrained | awk '{print $1}' | xargs kill


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16679185
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,425 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      finalStatus = status
    +      reporterThread.interrupt()
    +      try {
    +        if (Thread.currentThread() != reporterThread) {
    +          reporterThread.join()
    +        }
    +      } finally {
    +        client.shutdown(status, Option(diagnostics).getOrElse(""))
    +      }
    +    }
    +  }
    +
    +  private def sparkContextInitialized(sc: SparkContext) = {
    +    sparkContextRef.synchronized {
    +      sparkContextRef.compareAndSet(null, sc)
    +      sparkContextRef.notifyAll()
    +    }
    +  }
    +
    +  private def sparkContextStopped(sc: SparkContext) = {
    +    sparkContextRef.compareAndSet(sc, null)
    +  }
    +
    +  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
    +    val sc = sparkContextRef.get()
    +    allocator = client.register(yarnConf,
    +      if (sc != null) sc.getConf else sparkConf,
    +      if (sc != null) sc.preferredNodeLocationData else Map(),
    +      uiAddress,
    +      uiHistoryAddress)
    +
    +    allocator.allocateResources()
    +    reporterThread = launchReporterThread()
    +  }
    +
    +  private def runDriver(): Unit = {
    +    addAmIpFilter()
    +    val userThread = startUserClass()
    +
    +    // This a bit hacky, but we need to wait until the spark.driver.port property has
    +    // been set by the Thread executing the user class.
    +    val sc = waitForSparkContextInitialized()
    +
    +    // If there is no SparkContext at this point, just fail the app.
    +    if (sc == null) {
    +      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
    +    } else {
    +      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
    +      try {
    +        userThread.join()
    +      } finally {
    +        // In cluster mode, ask the reporter thread to stop since the user app is finished.
    +        reporterThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    +    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
    +      conf = sparkConf, securityManager = securityMgr)._1
    +    actor = waitForSparkDriver()
    +    addAmIpFilter()
    +    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), "")
    --- End diff --
    
    I haven't tested this yet but I don't see this pass in history ui address, so is it set properly when it finishes? 
    This was supported in client mode previously. sparkConf.get("spark.yarn.historyServer.address", "")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52659775
  
    for some reason github isn't let me comment on the files anymore so here are a couple more.
    
    allocateExecutors() isn't being called.
    in ClientBase.  amClass isn't used.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16549123
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    --- End diff --
    
    I don't completely follow this logic.  The finalStatus isn't set in many places, like FAILED on the check max failures, etc, as well as in the cleanuphook, so isn't it going to return 1 when it succeeds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16423385
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  private var finished = false
    +  private var registered = false
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +  private val userResult = new AtomicBoolean(false)
    +
    +  final def run(): Unit = {
    +    // Setup the directories so things go to YARN approved directories rather
    +    // than user specified and /tmp.
    +    System.setProperty("spark.local.dir", getLocalDirs())
    --- End diff --
    
    fyi this is going to conflict with https://github.com/apache/spark/pull/2002


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16423913
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,449 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  private var finished = false
    +  private var registered = false
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +  private val userResult = new AtomicBoolean(false)
    +
    +  final def run(): Unit = {
    +    // Setup the directories so things go to YARN approved directories rather
    +    // than user specified and /tmp.
    +    System.setProperty("spark.local.dir", getLocalDirs())
    +
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    // If this is the last attempt, register a shutdown hook to cleanup the staging dir
    +    // after the app is finished, in case it does not exit through the expected means.
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    if (isLastAttempt()) {
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          logInfo("AppMaster received a signal.")
    +          if (!finished) {
    +            cleanupStagingDir()
    +          }
    +        }
    +      }
    +      ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +    }
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    val success =
    +      try {
    +        if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
    +      } catch {
    +        case e: Exception =>
    +          logError("Exception while running AM main loop.", e)
    +          false
    +      }
    +
    +    finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
    +    val shouldCleanup = success || isLastAttempt()
    +    if (shouldCleanup) {
    +      cleanupStagingDir()
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status")
    --- End diff --
    
    I know you didn't change this , but can you add the diagnostics string to the log statement too?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16503725
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +  private val userResult = new AtomicBoolean(false)
    +
    +  final def run(): Unit = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    // If this is the last attempt, register a shutdown hook to cleanup the staging dir
    +    // after the app is finished, in case it does not exit through the expected means.
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    if (isLastAttempt()) {
    +      val cleanupHook = new Runnable {
    +        override def run() {
    +          logInfo("AppMaster received a signal.")
    +          if (!finished) {
    +            cleanupStagingDir()
    +          }
    +        }
    +      }
    +      ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +    }
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    val success =
    +      try {
    +        if (isDriver) runDriver() else runExecutorLauncher(securityMgr)
    +      } catch {
    +        case e: Exception =>
    +          logError("Exception while running AM main loop.", e)
    +          false
    +      }
    +
    +    finish(if (success) FinalApplicationStatus.SUCCEEDED else FinalApplicationStatus.FAILED)
    +    val shouldCleanup = success || isLastAttempt()
    +    if (shouldCleanup) {
    +      cleanupStagingDir()
    --- End diff --
    
    It will only rerun the AM if it doesn't cleanly exit. Part of that would be it exits with non-zero status, other cases would include: it crashes, doesn't launch properly, doesn't heartbeat, etc. 
    
    A call to the RM to unregister would be a clean exit.  The state we are sending back is the application final status. This is different from what the YARN application state is.  If we send a final status that means the application finished properly so we don't need to worry about rerunning.   The retries are really to handle the cases crashes or bad nodes, etc.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16548976
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    +      finish(finalStatus)
    +      0
    +    } else {
    +      1
    +    }
    +  }
    +
    +  final def finish(status: FinalApplicationStatus, diagnostics: String = null) = synchronized {
    +    if (!finished) {
    +      logInfo(s"Finishing ApplicationMaster with $status"  +
    +        Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse(""))
    +      finished = true
    +      reporterThread.interrupt()
    +      try {
    +        if (Thread.currentThread() != reporterThread) {
    +          reporterThread.join()
    +        }
    +      } finally {
    +        client.shutdown(status, Option(diagnostics).getOrElse(""))
    +      }
    +    }
    +  }
    +
    +  private def sparkContextInitialized(sc: SparkContext) = {
    +    sparkContextRef.synchronized {
    +      sparkContextRef.compareAndSet(null, sc)
    +      sparkContextRef.notifyAll()
    +    }
    +  }
    +
    +  private def sparkContextStopped(sc: SparkContext) = {
    +    sparkContextRef.compareAndSet(sc, null)
    +  }
    +
    +  private def registerAM(uiAddress: String, uiHistoryAddress: String) = {
    +    val sc = sparkContextRef.get()
    +    allocator = client.register(yarnConf,
    +      if (sc != null) sc.getConf else sparkConf,
    +      if (sc != null) sc.preferredNodeLocationData else Map(),
    +      uiAddress,
    +      uiHistoryAddress)
    +
    +    allocator.allocateResources()
    +    reporterThread = launchReporterThread()
    +  }
    +
    +  private def runDriver(): Unit = {
    +    addAmIpFilter()
    +    val userThread = startUserClass()
    +
    +    // This a bit hacky, but we need to wait until the spark.driver.port property has
    +    // been set by the Thread executing the user class.
    +    val sc = waitForSparkContextInitialized()
    +
    +    // If there is no SparkContext at this point, just fail the app.
    +    if (sc == null) {
    +      finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
    +    } else {
    +      registerAM(sc.ui.appUIHostPort, YarnSparkHadoopUtil.getUIHistoryAddress(sc, sparkConf))
    +      try {
    +        userThread.join()
    +      } finally {
    +        // In cluster mode, ask the reporter thread to stop since the user app is finished.
    +        reporterThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
    +    actorSystem = AkkaUtils.createActorSystem("sparkYarnAM", Utils.localHostName, 0,
    +      conf = sparkConf, securityManager = securityMgr)._1
    +    actor = waitForSparkDriver()
    +    addAmIpFilter()
    +    registerAM(sparkConf.get("spark.driver.appUIAddress", ""), "")
    +
    +    // In client mode the actor will stop the reporter thread.
    +    reporterThread.join()
    +    finalStatus = FinalApplicationStatus.SUCCEEDED
    +  }
    +
    +  private def launchReporterThread(): Thread = {
    +    // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses.
    +    val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
    +
    +    // we want to be reasonably responsive without causing too many requests to RM.
    +    val schedulerInterval =
    +      sparkConf.getLong("spark.yarn.scheduler.heartbeat.interval-ms", 5000)
    +
    +    // must be <= expiryInterval / 2.
    +    val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
    +
    +    val t = new Thread {
    +      override def run() {
    +        while (!finished) {
    +          checkNumExecutorsFailed()
    +          logDebug("Sending progress")
    +          allocator.allocateResources()
    +          try {
    +            Thread.sleep(interval)
    +          } catch {
    +            case e: InterruptedException =>
    +          }
    +        }
    +      }
    +    }
    +    // setting to daemon status, though this is usually not a good idea.
    +    t.setDaemon(true)
    +    t.setName("Reporter")
    +    t.start()
    +    logInfo("Started progress reporter thread - sleep time : " + interval)
    +    t
    +  }
    +
    +  /**
    +   * Clean up the staging directory.
    +   */
    +  private def cleanupStagingDir() {
    +    val fs = FileSystem.get(yarnConf)
    +    var stagingDirPath: Path = null
    +    try {
    +      val preserveFiles = sparkConf.get("spark.yarn.preserve.staging.files", "false").toBoolean
    +      if (!preserveFiles) {
    +        stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR"))
    +        if (stagingDirPath == null) {
    +          logError("Staging directory is null")
    +          return
    +        }
    +        logInfo("Deleting staging directory " + stagingDirPath)
    +        fs.delete(stagingDirPath, true)
    +      }
    +    } catch {
    +      case ioe: IOException =>
    +        logError("Failed to cleanup staging dir " + stagingDirPath, ioe)
    +    }
    +  }
    +
    +  // Note: this needs to happen before allocateExecutors.
    +  private def waitForSparkContextInitialized(): SparkContext = {
    +    logInfo("Waiting for spark context initialization")
    +    try {
    +      sparkContextRef.synchronized {
    +        var count = 0
    +        val waitTime = 10000L
    +        val numTries = sparkConf.getInt("spark.yarn.ApplicationMaster.waitTries", 10)
    +        while (sparkContextRef.get() == null && count < numTries && !finished) {
    +          logInfo("Waiting for spark context initialization ... " + count)
    +          count = count + 1
    +          sparkContextRef.wait(waitTime)
    +        }
    +
    +        val sparkContext = sparkContextRef.get()
    +        assert(sparkContext != null || count >= numTries)
    +        if (sparkContext == null) {
    +          logError(
    +            "Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d".format(
    +              count * waitTime, numTries))
    +        }
    +        sparkContext
    +      }
    +    }
    +  }
    +
    +  private def waitForSparkDriver(): ActorRef = {
    +    logInfo("Waiting for Spark driver to be reachable.")
    +    var driverUp = false
    +    val hostport = args.userArgs(0)
    +    val (driverHost, driverPort) = Utils.parseHostPort(hostport)
    +    while (!driverUp) {
    +      try {
    +        val socket = new Socket(driverHost, driverPort)
    +        socket.close()
    +        logInfo("Driver now available: %s:%s".format(driverHost, driverPort))
    +        driverUp = true
    +      } catch {
    +        case e: Exception =>
    +          logError("Failed to connect to driver at %s:%s, retrying ...".
    +            format(driverHost, driverPort))
    +          Thread.sleep(100)
    +      }
    +    }
    +    sparkConf.set("spark.driver.host", driverHost)
    +    sparkConf.set("spark.driver.port", driverPort.toString)
    +
    +    val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
    +      driverHost, driverPort.toString, CoarseGrainedSchedulerBackend.ACTOR_NAME)
    +    actorSystem.actorOf(Props(new MonitorActor(driverUrl)), name = "YarnAM")
    +  }
    +
    +  private def checkNumExecutorsFailed() = {
    +    if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
    +      finish(FinalApplicationStatus.FAILED, "Max number of executor failures reached.")
    +
    +      val sc = sparkContextRef.get()
    +      if (sc != null) {
    +        logInfo("Invoking sc stop from checkNumExecutorsFailed")
    +        sc.stop()
    +      }
    +    }
    +  }
    +
    +  /** Add the Yarn IP filter that is required for properly securing the UI. */
    +  private def addAmIpFilter() = {
    +    val amFilter = "org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter"
    +    val proxy = client.getProxyHostAndPort(yarnConf)
    +    val parts = proxy.split(":")
    +    val proxyBase = System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
    +    val uriBase = "http://" + proxy + proxyBase
    +    val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
    +
    +    if (isDriver) {
    +      System.setProperty("spark.ui.filters", amFilter)
    +      System.setProperty(s"spark.$amFilter.params", params)
    +    } else {
    +      actor ! AddWebUIFilter(amFilter, params, proxyBase)
    +    }
    +  }
    +
    +  private def startUserClass(): Thread = {
    +    logInfo("Starting the user JAR in a separate Thread")
    +    System.setProperty("spark.executor.instances", args.numExecutors.toString)
    +    val mainMethod = Class.forName(args.userClass, false,
    +      Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
    +
    +    val t = new Thread {
    +      override def run() {
    +        try {
    +          // Copy
    +          val mainArgs = new Array[String](args.userArgs.size)
    +          args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
    +          mainMethod.invoke(null, mainArgs)
    +          // Some apps have "System.exit(0)" at the end.  The user thread will stop here unless
    +          // it has an uncaught exception thrown out.  It needs a shutdown hook to set SUCCEEDED.
    +          finalStatus = FinalApplicationStatus.SUCCEEDED
    +        } finally {
    +          logDebug("Finishing main")
    --- End diff --
    
    this is now different from original and doesn't set the status to FAILED if an exception is thrown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53589432
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19311/consoleFull) for   PR 2020 at commit [`3bbf3e7`](https://github.com/apache/spark/commit/3bbf3e75177b170177a12f27d9dae80d631ec03c).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2020


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52669404
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53580677
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19311/consoleFull) for   PR 2020 at commit [`3bbf3e7`](https://github.com/apache/spark/commit/3bbf3e75177b170177a12f27d9dae80d631ec03c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52946303
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19061/consoleFull) for   PR 2020 at commit [`41f8c8a`](https://github.com/apache/spark/commit/41f8c8a2ef588c1ee90c58189d0c8892c88251e9).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53465198
  
    Other then the one comment I think this looks good.  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16549659
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.deploy.yarn
    +
    +import java.io.IOException
    +import java.net.Socket
    +import java.util.concurrent.atomic.AtomicReference
    +
    +import scala.collection.JavaConversions._
    +import scala.util.Try
    +
    +import akka.actor._
    +import akka.remote._
    +import org.apache.hadoop.conf.Configuration
    +import org.apache.hadoop.fs.{FileSystem, Path}
    +import org.apache.hadoop.util.ShutdownHookManager
    +import org.apache.hadoop.yarn.api._
    +import org.apache.hadoop.yarn.api.records._
    +import org.apache.hadoop.yarn.conf.YarnConfiguration
    +
    +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
    +import org.apache.spark.deploy.SparkHadoopUtil
    +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.AddWebUIFilter
    +import org.apache.spark.util.{AkkaUtils, SignalLogger, Utils}
    +
    +/**
    + * Common application master functionality for Spark on Yarn.
    + */
    +private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
    +  client: YarnRMClient) extends Logging {
    +  // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
    +  // optimal as more containers are available. Might need to handle this better.
    +  private val ALLOCATE_HEARTBEAT_INTERVAL = 100
    +
    +  private val sparkConf = new SparkConf()
    +  private val yarnConf: YarnConfiguration = new YarnConfiguration(new Configuration())
    +  private val isDriver = args.userClass != null
    +
    +  // Default to numExecutors * 2, with minimum of 3
    +  private val maxNumExecutorFailures = sparkConf.getInt("spark.yarn.max.executor.failures",
    +    sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors * 2, 3)))
    +
    +  @volatile private var finished = false
    +  @volatile private var finalStatus = FinalApplicationStatus.UNDEFINED
    +
    +  private var reporterThread: Thread = _
    +  private var allocator: YarnAllocator = _
    +
    +  // Fields used in client mode.
    +  private var actorSystem: ActorSystem = null
    +  private var actor: ActorRef = _
    +
    +  // Fields used in cluster mode.
    +  private val sparkContextRef = new AtomicReference[SparkContext](null)
    +
    +  final def run(): Int = {
    +    if (isDriver) {
    +      // Set the web ui port to be ephemeral for yarn so we don't conflict with
    +      // other spark processes running on the same box
    +      System.setProperty("spark.ui.port", "0")
    +
    +      // Set the master property to match the requested mode.
    +      System.setProperty("spark.master", "yarn-cluster")
    +    }
    +
    +    logInfo("ApplicationAttemptId: " + client.getAttemptId())
    +
    +    val cleanupHook = new Runnable {
    +      override def run() {
    +        // If the SparkContext is still registered, shut it down as a best case effort in case
    +        // users do not call sc.stop or do System.exit().
    +        val sc = sparkContextRef.get()
    +        if (sc != null) {
    +          logInfo("Invoking sc stop from shutdown hook")
    +          sc.stop()
    +          finish(FinalApplicationStatus.SUCCEEDED)
    +        }
    +
    +        // Cleanup the staging dir after the app is finished, or if it's the last attempt at
    +        // running the AM.
    +        val maxAppAttempts = client.getMaxRegAttempts(yarnConf)
    +        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
    +        if (finished || isLastAttempt) {
    +          cleanupStagingDir()
    +        }
    +      }
    +    }
    +    // Use priority 30 as it's higher than HDFS. It's the same priority MapReduce is using.
    +    ShutdownHookManager.get().addShutdownHook(cleanupHook, 30)
    +
    +    // Call this to force generation of secret so it gets populated into the
    +    // Hadoop UGI. This has to happen before the startUserClass which does a
    +    // doAs in order for the credentials to be passed on to the executor containers.
    +    val securityMgr = new SecurityManager(sparkConf)
    +
    +    if (isDriver) {
    +      runDriver()
    +    } else {
    +      runExecutorLauncher(securityMgr)
    +    }
    +
    +    if (finalStatus != FinalApplicationStatus.UNDEFINED) {
    --- End diff --
    
    Oops. I missed setting `finalStatus` in the `finish` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52556322
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18783/consoleFull) for   PR 2020 at commit [`5100474`](https://github.com/apache/spark/commit/5100474aa46627e345951977d48e178ea793850f).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53580270
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52853935
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19005/consoleFull) for   PR 2020 at commit [`c0794be`](https://github.com/apache/spark/commit/c0794befcd6ae6e802fe432f46f163ebdd0f549a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2020#discussion_r16676739
  
    --- Diff: yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---
    @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
     
     import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
     import org.apache.spark.{SparkException, Logging, SparkContext}
    -import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil}
    +import org.apache.spark.deploy.yarn.{ApplicationMaster, Client, ClientArguments, YarnSparkHadoopUtil}
    --- End diff --
    
    I don't see that ApplicationMaster is used here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52831134
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18971/consoleFull) for   PR 2020 at commit [`92770cc`](https://github.com/apache/spark/commit/92770ccdf1d9585842c717fe86c8193403c21cc5).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-53491638
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19232/consoleFull) for   PR 2020 at commit [`ff389ed`](https://github.com/apache/spark/commit/ff389ed2629309e8b97f22dd74571e32a0f482d4).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by tgravescs <gi...@git.apache.org>.
Github user tgravescs commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52558229
  
    I'll take a look and hopefully try it out tomorrow.  You didn't happen to change the client mode to be unmanaged am did you?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2933] [yarn] Refactor and cleanup Yarn ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2020#issuecomment-52678030
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18855/consoleFull) for   PR 2020 at commit [`ecaf332`](https://github.com/apache/spark/commit/ecaf332ba2ad02c2fc49ffefb7f17eb756a4bc23).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org