You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by steveloughran <gi...@git.apache.org> on 2015/10/20 17:01:27 UTC

[GitHub] spark pull request: SPARK-1537: pure service API and test service

GitHub user steveloughran opened a pull request:

    https://github.com/apache/spark/pull/9182

    SPARK-1537: pure service API and test service

    This is purely the yarn/src/main and yarn/src/test bits of the YARN ATS integration: the extension model to load and run implementations of `SchedulerExtensionService` in the yarn cluster scheduler process —and to stop them afterwards.
    
    There's duplication between the two schedulers, yarn-client and yarn-cluster, at least in terms of setting everything up, because the common superclass, `YarnSchedulerBackend` is in spark-core, and the extension services need the YARN app/attempt IDs.
    
    If you look at how the the extension services are loaded, the case class `SchedulerExtensionServiceBinding` is used to pass in config info -currently just the spark context and the yarn IDs, of which one, the attemptID, will be null when running client-side. I'm passing in a case class to ensure that it would be possible in future to add extra arguments to the binding class, yet, as the method signature will not have changed, still be able to load existing services.
    
    There's no functional extension service here, just one for testing. The real tests come in the bigger pull requests. At the same time, there's no restriction of this extension service purely to the ATS history publisher. Anything else that wants to listen to the spark context and publish events could use this, and I'd also consider writing one for the YARN-913 registry service, so that the URLs of the web UI would be locatable through that (low priority; would make more sense if integrated with a REST client).
    
    There's no minicluster test. Given the test execution overhead of setting up minicluster tests, it'd  probably be better to add an extension service into one of the existing tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/steveloughran/spark stevel/feature/SPARK-1537-service

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9182.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9182
    
----
commit e796bb33d3128587f87529284fe45b37e1205de5
Author: Steve Loughran <st...@hortonworks.com>
Date:   2015-10-20T14:35:40Z

    SPARK-1537: pure service API and test service

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151877402
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44524/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43251937
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    nit: this goes before the previous import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43254504
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala ---
    @@ -0,0 +1,50 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +
    --- End diff --
    
    nit: nuke the extra empty lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152164469
  
    **[Test build #44593 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44593/consoleFull)** for PR 9182 at commit [`8a6a1f1`](https://github.com/apache/spark/commit/8a6a1f13235fd00dcc58c4106b0314098f961e67).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43284929
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    +    super.start()
    +  }
    +
    +  override def stop(): Unit = {
    +    super.stop()
    --- End diff --
    
    Good question about what to do with exceptions. I think the best approach is to have `stop()` be called from a method that catches and logs exceptions. That way you kill two problems: you'll not replace any exceptions caused by `super.stop()`, and you'll make sure `stop()` is called for all existing services (which the current code doesn't do when exceptions are thrown).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151979760
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44536/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43373740
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by [[SchedulerExtensionServices]],
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151914677
  
     Build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149602761
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152164064
  
    **[Test build #44593 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44593/consoleFull)** for PR 9182 at commit [`8a6a1f1`](https://github.com/apache/spark/commit/8a6a1f13235fd00dcc58c4106b0314098f961e67).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219575
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYARN(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    --- End diff --
    
    `bindToYarn`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43468563
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
     import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    --- End diff --
    
    Do we need this import?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152162358
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r44824618
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    --- End diff --
    
    super nit: stray empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152188739
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151917268
  
    **[Test build #44536 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44536/consoleFull)** for PR 9182 at commit [`32b8655`](https://github.com/apache/spark/commit/32b86557c790daed45b7b1d3e47fbeba76e91f69).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43252081
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    --- End diff --
    
    When will this be not set? I assume in client mode? Could you mention that in the scaladoc above?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43358929
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by [[SchedulerExtensionServices]],
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    --- End diff --
    
    Just a suggestion in general (you don't need to change that here if you don't want to), but you can use a shorter syntax in these cases:
    
        def getServices: List[SchedulerExtensionService] = services



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151220021
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161861637
  
    It looks like some MiMa tests are now failing in the Hadoop pre-YARN builds:
    
    ```
    [info] spark-core: found 1 potential binary incompatibilities (filtered 715)
    [error]  * class org.apache.spark.scheduler.cluster.YarnSchedulerBackend#YarnSchedulerEndpoint does not have a correspondent in new version
    [error]    filter with: ProblemFilters.exclude[MissingClassProblem]
    ```
    
    https://amplab.cs.berkeley.edu/jenkins/job/Spark-Master-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/4165/console
    
    Could you take a quick look?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155932349
  
    **[Test build #45681 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45681/consoleFull)** for PR 9182 at commit [`810cb75`](https://github.com/apache/spark/commit/810cb7591762fd5f325a9023690edc17b7566dda).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161090381
  
    A few minor things left, otherwise looks ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43253567
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +          val instance = Utils.classForName(sClass)
    +            .newInstance()
    +            .asInstanceOf[SchedulerExtensionService]
    +          // bind this service
    +          instance.start(binding)
    +          logInfo(s"Service $sClass started")
    +          instance
    +        }
    +    }.map(_.toList).getOrElse(Nil)
    --- End diff --
    
    minor: instead of another call to `map` you could add the `toList` call to the code inside the previous closure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43378157
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    But do you need the parsed information? e.g. `ApplicationId` has a "cluster timestamp" and an id; I don't see much use in providing those separately to these services, the string id seems good enough in my view.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161740895
  
    Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155968443
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152536617
  
    OK, new patch with uses String values for appId and app attempt ID in the service extension binding —but explicitly derives these from the YARN app & attempt IDs, so that services and downstream code can be confident they will be locatable in the YARN services (RM, NM , etc).
    
    Tests (here and dependent pull requests) all  updated to match.
    
    I've moved the Stub app/attempd ID classes out, as they are now only needed for integration tests in the final history provider patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151931420
  
    rebase to follow


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43281460
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
    -import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.ui.JettyUtils
    -import org.apache.spark.util.{ThreadUtils, RpcUtils}
    -
    -import scala.util.control.NonFatal
    +import org.apache.spark.util.{RpcUtils, ThreadUtils}
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    uh oh, this looks like IDEA may be imposting a different sort order than expected, especially on `{` chars.
    
    This is what the IDE-generated order looks like, based on the instructions in the spark style guide:
    
    ```
    import scala.concurrent.{ExecutionContext, Future}
    import scala.util.control.NonFatal
    
    import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    
    import org.apache.spark.rpc._
    import org.apache.spark.scheduler._
    import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
    import org.apache.spark.ui.JettyUtils
    import org.apache.spark.util.{RpcUtils, ThreadUtils}
    import org.apache.spark.{Logging, SparkContext}
    ```
    
    What's it getting wrong? as in "what will I have to review all my source files for"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43246410
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala ---
    @@ -0,0 +1,50 @@
    +/*
    --- End diff --
    
    I'm using them more in the tests in the later patches. I can (and will) move them into the test helper, but be assured, there's a lot more tests to come.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46336258
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
    +
    +/**
    + * Test the integration with [[SchedulerExtensionServices]]
    + */
    +class ExtensionServiceIntegrationSuite extends SparkFunSuite
    +  with BeforeAndAfter
    +  with Logging {
    +
    +  val applicationId = new StubApplicationId(0, 1111L)
    +  val attemptId = new StubApplicationAttemptId(applicationId, 1)
    +  var sparkCtx: SparkContext = _
    +
    +  /*
    +   * Setup phase creates the spark context
    +   */
    +  before {
    +    val sparkConf = new SparkConf()
    +    sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
    +      classOf[SimpleExtensionService].getName())
    +    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
    +    sparkCtx = new SparkContext(sparkConf)
    --- End diff --
    
    You can probably use `LocalSparkContext` instead of rolling your own code for cleaning this up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219717
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala ---
    @@ -0,0 +1,50 @@
    +/*
    --- End diff --
    
    Can we put these fake stub class into one file like: `SparkYarnTestHelper` or something else? That will possibly reduce the file number.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152535453
  
    **[Test build #44682 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44682/consoleFull)** for PR 9182 at commit [`fa6b9b1`](https://github.com/apache/spark/commit/fa6b9b11159a06bedcef0ce36e0c864cd947a5de).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46433390
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
    + *                  cluster mode.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var serviceOption: Option[String] = None
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
    +
    +    serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +    services = serviceOption
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = services
    +
    +  /**
    +   * Stop the services; idempotent.
    +   *
    +   */
    +  override def stop(): Unit = {
    +    if (started.getAndSet(false)) {
    +      logInfo(s"Stopping $this")
    +      services.foreach { s =>
    +        try {
    --- End diff --
    
    done. Will look for other uses elsewhere too


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219322
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +        .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +                .newInstance()
    --- End diff --
    
    Do we need to try catch some exceptions like `ClassNotFound` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161428373
  
    **[Test build #47072 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47072/consoleFull)** for PR 9182 at commit [`b9a1834`](https://github.com/apache/spark/commit/b9a183467e529dbde79bbf1548c0c921eba18d0e).
     * This patch **fails from timeout after a configured wait of \`250m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161445985
  
    LGTM pending tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152164473
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149594189
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151979572
  
    **[Test build #44536 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44536/consoleFull)** for PR 9182 at commit [`32b8655`](https://github.com/apache/spark/commit/32b86557c790daed45b7b1d3e47fbeba76e91f69).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43279925
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    --- End diff --
    
    I thin I'd tried to manually edit it from Jerry's comment & got it wrong. Will try the IDE this time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43284627
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
    -import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.ui.JettyUtils
    -import org.apache.spark.util.{ThreadUtils, RpcUtils}
    -
    -import scala.util.control.NonFatal
    +import org.apache.spark.util.{RpcUtils, ThreadUtils}
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    The thing it's getting wrong is that it's listing child packages (e.g. `org.apache.spark.rpc`) before the parent one (`org.apache.spark`). I'm not sure how to configure IDEs to handle that (I don't use one myself).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152781453
  
    So my whole feedback about the IDs wasn't because of the type, it was because if you could use the existing values, you wouldn't need to add all the code related to plumbing the values from the ApplicationMaster or wherever else, and you wouldn't need that `bindToYarn` method you're adding.
    
    If you really need the full attempt ID for the service you're adding later, then you still need all that code, and it doesn't matter whether you expose a String or the actual type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152162385
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152532383
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43377401
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by [[SchedulerExtensionServices]],
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    +    services
    +  }
    +
    +  /**
    +   * Stop the services; idempotent.
    +    *
    +    * Any
    +   */
    +  override def stop(): Unit = {
    +    if (started.getAndSet(false)) {
    +      logInfo(s"Stopping $this")
    --- End diff --
    
    added one


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151262911
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44368/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151975471
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44537/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219421
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---
    @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
     
     import org.apache.spark.SparkContext
     import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
    +import org.apache.spark.deploy.yarn.ApplicationMaster
    --- End diff --
    
    nit: this import can be merged with above one.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155968446
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/45681/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152559523
  
    **[Test build #44682 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44682/consoleFull)** for PR 9182 at commit [`fa6b9b1`](https://github.com/apache/spark/commit/fa6b9b11159a06bedcef0ce36e0c864cd947a5de).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149602764
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43985/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151825730
  
     Build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r44825346
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var serviceOption: Option[String] = None
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +    services = serviceOption
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = services
    +
    +  /**
    +   * Stop the services; idempotent.
    +    *
    +    * Any
    --- End diff --
    
    nit: nuke this line and the previous one? (they're also indented wrongly.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152559610
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44682/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r44824670
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var serviceOption: Option[String] = None
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    --- End diff --
    
    nit: if you use `$appId` then the whole statement fits in the same line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43439082
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    For the history service: yes. What I didn't want to do was write an extension service which suited exactly one use case though. 
    
    Still, there's not much you can do with an appId that doesn't involve the YARN client or AM/RPM RPC channels, and if a some extension wants to do that, it'll probably need more than just the ID structures. For other uses (e.g. grabbing the app reports, looking things up in the RM web UI, etc, those string values will work).
    
    I'll go to strings.
    
    W.r.t the existing methods, the application attempt ID that exists today is the tail of the (parsed) app attempt string, "1", "2", .... For the history stuff I do need the full attemptID for its uniqueness.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43363947
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    (update: I see that doing that would mean changing types, but do you need to expose the YARN types to the service? Is there a method to parse the String into the YARN type if needed?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43220093
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    --- End diff --
    
    Here `binding` is actually duplicated with below 3 parameters, from my understanding in this code, we could choose either.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152559608
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219638
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +
    --- End diff --
    
    nit: one more empty line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43254282
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
    +
    +/**
    + * Test the integration with [[SchedulerExtensionServices]]
    + */
    +class ExtensionServiceIntegrationSuite extends SparkFunSuite
    +  with BeforeAndAfter
    +  with Logging {
    +
    +  val applicationId = new StubApplicationId(0, 1111L)
    +  val attemptId = new StubApplicationAttemptId(applicationId, 1)
    +  var sparkCtx: SparkContext = _
    +
    +  /*
    +   * Setup phase creates the spark context
    +   */
    +  before {
    +    val sparkConf = new SparkConf()
    +    sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
    +      "org.apache.spark.scheduler.cluster.SimpleExtensionService")
    --- End diff --
    
    nit: `classOf[SimpleExtensionService].getName()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152532420
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43359060
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,155 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by [[SchedulerExtensionServices]],
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    +    services
    +  }
    +
    +  /**
    +   * Stop the services; idempotent.
    +    *
    +    * Any
    +   */
    +  override def stop(): Unit = {
    +    if (started.getAndSet(false)) {
    +      logInfo(s"Stopping $this")
    --- End diff --
    
    This class doesn't have a `toString` so this will probably look ugly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151262676
  
    **[Test build #44368 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44368/consoleFull)** for PR 9182 at commit [`d018a38`](https://github.com/apache/spark/commit/d018a38c019906e26b9ab2371e5c1e6f1b2d2faa).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161434959
  
    **[Test build #47084 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47084/consoleFull)** for PR 9182 at commit [`b9a1834`](https://github.com/apache/spark/commit/b9a183467e529dbde79bbf1548c0c921eba18d0e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155968349
  
    **[Test build #45681 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/45681/consoleFull)** for PR 9182 at commit [`810cb75`](https://github.com/apache/spark/commit/810cb7591762fd5f325a9023690edc17b7566dda).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43281918
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala ---
    @@ -0,0 +1,88 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
    +
    +/**
    + * Test the integration with [[SchedulerExtensionServices]]
    + */
    +class ExtensionServiceIntegrationSuite extends SparkFunSuite
    +  with BeforeAndAfter
    +  with Logging {
    +
    +  val applicationId = new StubApplicationId(0, 1111L)
    +  val attemptId = new StubApplicationAttemptId(applicationId, 1)
    +  var sparkCtx: SparkContext = _
    +
    +  /*
    +   * Setup phase creates the spark context
    +   */
    +  before {
    +    val sparkConf = new SparkConf()
    +    sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
    +      "org.apache.spark.scheduler.cluster.SimpleExtensionService")
    +    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
    +    sparkCtx = new SparkContext(sparkConf)
    +  }
    +
    +  /*
    +   * Teardown stops all services and the VM-wide spark context
    +   */
    +  after {
    +    stopSparkContext()
    +  }
    +
    +  protected def stopSparkContext(): Unit = {
    +    if (sparkCtx != null) {
    +      logDebug("Stopping spark context")
    +      sparkCtx.stop()
    +      sparkCtx = null
    +    }
    +  }
    +
    +
    --- End diff --
    
    whitespace line; will fix


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161356666
  
    **[Test build #47072 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47072/consoleFull)** for PR 9182 at commit [`b9a1834`](https://github.com/apache/spark/commit/b9a183467e529dbde79bbf1548c0c921eba18d0e).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43358778
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    Still out of order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152188763
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152192812
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44601/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43254205
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    +    super.start()
    +  }
    +
    +  override def stop(): Unit = {
    +    super.stop()
    --- End diff --
    
    super minor, but maybe do a try..finally here just in case `super.stop()` throws?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151827066
  
    **[Test build #44524 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44524/consoleFull)** for PR 9182 at commit [`a4358d5`](https://github.com/apache/spark/commit/a4358d5b23dd2d7db706574124e4c69d1171ffb4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43280181
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +          val instance = Utils.classForName(sClass)
    +            .newInstance()
    --- End diff --
    
    I can add that in the docs —or what if converted `SchedulerExtensionService` from a trait to a class, and added an empty constructor there, with the documentation on that constructor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151220003
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161433043
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152164476
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44593/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151845427
  
    Looks OK to me, mostly just style nits. Also, needs a rebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43246322
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala ---
    @@ -0,0 +1,39 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +
    --- End diff --
    
    got it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43246543
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    --- End diff --
    
    OK, saving binding as a field; converting the others to local vars.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161428496
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47072/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149594143
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151262907
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161460950
  
    **[Test build #47084 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/47084/consoleFull)** for PR 9182 at commit [`b9a1834`](https://github.com/apache/spark/commit/b9a183467e529dbde79bbf1548c0c921eba18d0e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161461070
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/47084/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-150689362
  
    -Will do. Then I can combine everything nicely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161428493
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151937732
  
    **[Test build #44537 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44537/consoleFull)** for PR 9182 at commit [`8171a98`](https://github.com/apache/spark/commit/8171a982ddbed930227583243b230bd35d89d047).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43281778
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    +    super.start()
    +  }
    +
    +  override def stop(): Unit = {
    +    super.stop()
    --- End diff --
    
    always good to be cautious. Now, should I try to be clever in that finally() and downgrade any service stop exceptions into logged events? That way, if the superclass did raise something, it wouldn't get lost by a second exception in the finally clause?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151825741
  
    Build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161107529
  
    thanks -will deal with these on wednesday.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151975265
  
    **[Test build #44537 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44537/consoleFull)** for PR 9182 at commit [`8171a98`](https://github.com/apache/spark/commit/8171a982ddbed930227583243b230bd35d89d047).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r44824859
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID -if known.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var serviceOption: Option[String] = None
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    --- End diff --
    
    `getConf` creates a copy of the conf; you can use `sparkContext.conf` which doesn't.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43244882
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +        .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +                .newInstance()
    +                .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }
    +    }.map(_.toList).getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    +    services
    +  }
    +
    +  override def stop(): Unit = {
    +    logInfo(s"Stopping $this")
    --- End diff --
    
    good catch. will set & skip the operation if !started. This will permit reissuing of the start() call from a stopped service, but a full state machine here is probably overkill.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] WiP: add service API and ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151222678
  
    **[Test build #44368 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44368/consoleFull)** for PR 9182 at commit [`d018a38`](https://github.com/apache/spark/commit/d018a38c019906e26b9ab2371e5c1e6f1b2d2faa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43374435
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
    -import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.ui.JettyUtils
    -import org.apache.spark.util.{ThreadUtils, RpcUtils}
    -
    -import scala.util.control.NonFatal
    +import org.apache.spark.util.{RpcUtils, ThreadUtils}
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    I know what's up. It's sorting alphabetically within a group, and comes `{` after the alphabet, so child packages come first. I'll review these things by hand & will have to do the same through the other patches. Something to call out on the spark style guide maybe —it does cover the IDEA import patterns, but not this quirk


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151877002
  
    **[Test build #44524 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/44524/consoleFull)** for PR 9182 at commit [`a4358d5`](https://github.com/apache/spark/commit/a4358d5b23dd2d7db706574124e4c69d1171ffb4).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155931357
  
     Build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-150724595
  
    Also, could you please follow the PR title convention? See https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PullRequest
    
    It would also be nice to file sub-tasks instead of piling a bunch of PRs on top of SPARK-1537.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219135
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    --- End diff --
    
    nit: 2 spaces indent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155931377
  
    Build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43498798
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    No, because that's not the yarn attempt ID. I explicitly use that to query the YARN RM & identify attempts/apps which have finished/disappeared without closing their event stream. So I do need that attempt ID, as would any other plugin that wanted to work with the RM. They could *probably* get away with it as a string. And if in future it turns out to not be the case, the use of a binding class will allow it to be added without breaking any existing plugins the way adding a new parameter would do



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43246273
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYARN(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    --- End diff --
    
    OK


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151979757
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151934205
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43376459
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    string parsing has proven fairly brittle in the past; the move from single to multiple attempts broke all apps trying to do it across versions (i.e. a hadoop 2.2 parser in a 2.5 cluster). Unless you want to base-64 encode the protobuf representation, I'd avoid that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r44824570
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,160 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated on a client, there's no attempt ID, as it lacks
    --- End diff --
    
    nit: "on a client" -> "in client mode"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43244486
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    --- End diff --
    
    fixed, +lines directly below



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46432648
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,64 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  protected var appId: Option[ApplicationId] = None
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    --- End diff --
    
    got it. IntelliJ IDEA policy games


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-150658207
  
    > There's duplication between the two schedulers, ..., because the common superclass, YarnSchedulerBackend is in spark-core
    
    I'd just move that class to the yarn module. There's no reason why it has to be in spark-core.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46432466
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,64 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    --- End diff --
    
    replacing with  ` /** Application ID. */` ; the `bindToYarn()` scaladocs state what's needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-161461069
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149596180
  
    **[Test build #43985 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43985/consoleFull)** for PR 9182 at commit [`e796bb3`](https://github.com/apache/spark/commit/e796bb33d3128587f87529284fe45b37e1205de5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43359713
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    Could use the already existing `applicationId` and `applicationAttemptId` methods instead of having to add `bindToYarn` and related code you're adding in this PR? From a quick look at the code it seems like things are initialized in the right order for those to work.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152816395
  
    and I've already gone and moved to strings. never mind.
    
    the existing attempt IDs are nice for humans in the web UI, and potentially in the rest, but don't let you hook up to yarn's internals.
    
    now, should I stay with String or roll back?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46335588
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,64 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    --- End diff --
    
    What service? It's not clear what the comment refers to.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151877399
  
    Build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43280831
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +          val instance = Utils.classForName(sClass)
    +            .newInstance()
    +            .asInstanceOf[SchedulerExtensionService]
    +          // bind this service
    +          instance.start(binding)
    +          logInfo(s"Service $sClass started")
    +          instance
    +        }
    +    }.map(_.toList).getOrElse(Nil)
    --- End diff --
    
    OK



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-155937044
  
    This just rolls back to the explicit app/attempt ID. it's got better guarantees of what's coming down


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43252317
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    --- End diff --
    
    nit: indentation here is weird.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43252433
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +      s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +      .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +          val instance = Utils.classForName(sClass)
    +            .newInstance()
    --- End diff --
    
    Hmmm... `SchedulerExtensionServices` should probably mention that implementations must have an empty constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46335614
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,64 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  protected var appId: Option[ApplicationId] = None
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    --- End diff --
    
    nit: indented one space too far.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43279624
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,136 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    --- End diff --
    
    correct: only the AM-side bindings have it. Will cover in the javadocs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9182


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43504882
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
     import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    --- End diff --
    
    afraid so. This patch reorders things so be consistent with the style guidelines; That's usually v. brittle with other patches, but as this diff includes a rename, things will break anyway...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: SPARK-1537: pure service API and test service

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-149602550
  
    **[Test build #43985 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43985/consoleFull)** for PR 9182 at commit [`e796bb3`](https://github.com/apache/spark/commit/e796bb33d3128587f87529284fe45b37e1205de5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:\n  * `trait SchedulerExtensionService `\n  * `case class SchedulerExtensionServiceBinding(`\n


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43244977
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +        .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +                .newInstance()
    --- End diff --
    
    I thought about that, but consider this: when would you want failure to load your listed extension services as something not to fail on? Do you want it to quitely downgrade, vs noisily fail?
    
    maybe we could make it an option


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43479330
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    > For the history stuff I do need the full attemptID for its uniqueness.
    
    Couldn't you create a unique ID from app ID + current non-universally-unique attempt ID?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43219282
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,137 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.util.Utils
    +import org.apache.spark.{Logging, SparkContext}
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped
    + *
    + * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]]
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId optional AttemptID.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + *
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private var sparkContext: SparkContext = _
    +  private var appId: ApplicationId = _
    +  private var attemptId: Option[ApplicationAttemptId] = _
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls
    +
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    sparkContext = binding.sparkContext
    +    appId = binding.applicationId
    +    attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app ${binding.applicationId}" +
    +        s" and attemptId $attemptId")
    +
    +    services = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +        .map { s =>
    +      s.split(",").map(_.trim()).filter(!_.isEmpty)
    +        .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +                .newInstance()
    +                .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }
    +    }.map(_.toList).getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = {
    +    services
    +  }
    +
    +  override def stop(): Unit = {
    +    logInfo(s"Stopping $this")
    --- End diff --
    
    I think here we need to set `started` to `false`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-156540005
  
    @steveloughran could you take a look at my previous suggestion? With your current patch there are two ways that the application / attempt ID are propagated to the scheduler backend; it would be nice to consolidate them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152870622
  
    BTW, if you're keeping `bindToYarn` and friends, you could change `applicationId` and `applicationAttemptId` to return the values you're setting there, which also means you could have a single implementation for the two methods (instead of the current separate impls for client and cluster modes).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43253815
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -17,17 +17,17 @@
     
     package org.apache.spark.scheduler.cluster
     
    -import scala.collection.mutable.ArrayBuffer
    -import scala.concurrent.{Future, ExecutionContext}
    +import scala.concurrent.{ExecutionContext, Future}
    +import scala.util.control.NonFatal
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
     
    -import org.apache.spark.{Logging, SparkContext}
     import org.apache.spark.rpc._
    -import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.scheduler._
    +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
     import org.apache.spark.ui.JettyUtils
    -import org.apache.spark.util.{ThreadUtils, RpcUtils}
    -
    -import scala.util.control.NonFatal
    +import org.apache.spark.util.{RpcUtils, ThreadUtils}
    +import org.apache.spark.{Logging, SparkContext}
    --- End diff --
    
    nit: move before previous import


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43253907
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,38 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-side schedulers */
    --- End diff --
    
    nit: "client mode schedulers" is more in line with how the rest of code refers to things.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46434094
  
    --- Diff: yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import org.scalatest.BeforeAndAfter
    +
    +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite}
    +
    +/**
    + * Test the integration with [[SchedulerExtensionServices]]
    + */
    +class ExtensionServiceIntegrationSuite extends SparkFunSuite
    +  with BeforeAndAfter
    +  with Logging {
    +
    +  val applicationId = new StubApplicationId(0, 1111L)
    +  val attemptId = new StubApplicationAttemptId(applicationId, 1)
    +  var sparkCtx: SparkContext = _
    +
    +  /*
    +   * Setup phase creates the spark context
    +   */
    +  before {
    +    val sparkConf = new SparkConf()
    +    sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
    +      classOf[SimpleExtensionService].getName())
    +    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
    +    sparkCtx = new SparkContext(sparkConf)
    --- End diff --
    
    will do


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r43498840
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala ---
    @@ -51,6 +51,41 @@ private[spark] abstract class YarnSchedulerBackend(
     
       private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
     
    +  /** Application ID. Must be set by a subclass before starting the service */
    +  private var appId: ApplicationId = null
    +
    +  /** Attempt ID. This is unset for client-mode schedulers */
    +  private var attemptId: Option[ApplicationAttemptId] = None
    +
    +  /** Scheduler extension services */
    +  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
    +
    +  /**
    +    * Bind to YARN. This *must* be done before calling [[start()]].
    +    *
    +    * @param appId YARN application ID
    +    * @param attemptId Optional YARN attempt ID
    +    */
    +  protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
    +    this.appId = appId
    +    this.attemptId = attemptId
    +  }
    +
    +  override def start() {
    +    require(appId != null, "application ID unset")
    +    val binding = SchedulerExtensionServiceBinding(sc, appId, attemptId)
    +    services.start(binding)
    --- End diff --
    
    Regarding that cluster timestamp, it came in with RM HA; its the thing that broke any 2.2 string parsing.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9182#discussion_r46335932
  
    --- Diff: yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.scheduler.cluster
    +
    +import java.util.concurrent.atomic.AtomicBoolean
    +
    +import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
    +
    +import org.apache.spark.{Logging, SparkContext}
    +import org.apache.spark.util.Utils
    +
    +/**
    + * An extension service that can be loaded into a Spark YARN scheduler.
    + * A Service that can be started and stopped.
    + *
    + * 1. For implementations to be loadable by `SchedulerExtensionServices`,
    + * they must provide an empty constructor.
    + * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    + * never invoked.
    + */
    +trait SchedulerExtensionService {
    +
    +  /**
    +   * Start the extension service. This should be a no-op if
    +   * called more than once.
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit
    +
    +  /**
    +   * Stop the service
    +   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
    +   * never invoked.
    +   */
    +  def stop(): Unit
    +}
    +
    +/**
    + * Binding information for a [[SchedulerExtensionService]].
    + *
    + * The attempt ID will be set if the service is started within a YARN application master;
    + * there is then a different attempt ID for every time that AM is restarted.
    + * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
    + * this information.
    + * @param sparkContext current spark context
    + * @param applicationId YARN application ID
    + * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
    + *                  cluster mode.
    + */
    +case class SchedulerExtensionServiceBinding(
    +    sparkContext: SparkContext,
    +    applicationId: ApplicationId,
    +    attemptId: Option[ApplicationAttemptId] = None)
    +
    +/**
    + * Container for [[SchedulerExtensionService]] instances.
    + *
    + * Loads Extension Services from the configuration property
    + * `"spark.yarn.services"`, instantiates and starts them.
    + * When stopped, it stops all child entries.
    + *
    + * The order in which child extension services are started and stopped
    + * is undefined.
    + */
    +private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
    +    with Logging {
    +  private var serviceOption: Option[String] = None
    +  private var services: List[SchedulerExtensionService] = Nil
    +  private val started = new AtomicBoolean(false)
    +  private var binding: SchedulerExtensionServiceBinding = _
    +
    +  /**
    +   * Binding operation will load the named services and call bind on them too; the
    +   * entire set of services are then ready for `init()` and `start()` calls.
    +   *
    +   * @param binding binding to the spark application and YARN
    +   */
    +  def start(binding: SchedulerExtensionServiceBinding): Unit = {
    +    if (started.getAndSet(true)) {
    +      logWarning("Ignoring re-entrant start operation")
    +      return
    +    }
    +    require(binding.sparkContext != null, "Null context parameter")
    +    require(binding.applicationId != null, "Null appId parameter")
    +    this.binding = binding
    +    val sparkContext = binding.sparkContext
    +    val appId = binding.applicationId
    +    val attemptId = binding.attemptId
    +    logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
    +
    +    serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
    +    services = serviceOption
    +      .map { s =>
    +        s.split(",").map(_.trim()).filter(!_.isEmpty)
    +          .map { sClass =>
    +            val instance = Utils.classForName(sClass)
    +              .newInstance()
    +              .asInstanceOf[SchedulerExtensionService]
    +            // bind this service
    +            instance.start(binding)
    +            logInfo(s"Service $sClass started")
    +            instance
    +          }.toList
    +      }.getOrElse(Nil)
    +  }
    +
    +  /**
    +   * Get the list of services.
    +   *
    +   * @return a list of services; Nil until the service is started
    +   */
    +  def getServices: List[SchedulerExtensionService] = services
    +
    +  /**
    +   * Stop the services; idempotent.
    +   *
    +   */
    +  override def stop(): Unit = {
    +    if (started.getAndSet(false)) {
    +      logInfo(s"Stopping $this")
    +      services.foreach { s =>
    +        try {
    --- End diff --
    
    `Utils.tryLogNonFatalError`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-162031306
  
    I'll add an exclusion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152192810
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151914699
  
    Build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-152864501
  
    > now, should I stay with String or roll back?
    
    Up to you. I was hoping you did not need to use the full attempt id and instead be able to get rid of the code you're adding, but...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151975470
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11314] [YARN] add service API and test ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9182#issuecomment-151934239
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org