You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/25 19:57:56 UTC

[spark] branch master updated: [SPARK-26788][YARN] Remove SchedulerExtensionService.

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4808393  [SPARK-26788][YARN] Remove SchedulerExtensionService.
4808393 is described below

commit 4808393449ccad2c6bc73c91d0ed8dd8f60c7054
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Mon Feb 25 13:57:37 2019 -0600

    [SPARK-26788][YARN] Remove SchedulerExtensionService.
    
    Since the yarn module is actually private to Spark, this interface was never
    actually "public". Since it has no use inside of Spark, let's avoid adding
    a yarn-specific extension that isn't public, and point any potential users
    are more general solutions (like using a SparkListener).
    
    Closes #23839 from vanzin/SPARK-26788.
    
    Authored-by: Marcelo Vanzin <va...@cloudera.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../main/scala/org/apache/spark/SparkConf.scala    |   3 +-
 .../org/apache/spark/deploy/yarn/config.scala      |   6 -
 .../cluster/SchedulerExtensionService.scala        | 143 ---------------------
 .../scheduler/cluster/YarnSchedulerBackend.scala   |   7 -
 .../cluster/ExtensionServiceIntegrationSuite.scala |  72 -----------
 .../scheduler/cluster/SimpleExtensionService.scala |  34 -----
 .../cluster/StubApplicationAttemptId.scala         |  48 -------
 .../scheduler/cluster/StubApplicationId.scala      |  42 ------
 8 files changed, 2 insertions(+), 353 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e686e07..529ca3f 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -638,7 +638,8 @@ private[spark] object SparkConf extends Logging {
       DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
         "Not used anymore. Please use spark.shuffle.service.index.cache.size"),
       DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."),
-      DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.")
+      DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."),
+      DeprecatedConfig("spark.yarn.services", "3.0.0", "Feature no longer available.")
     )
 
     Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index bd5e136..5f8739b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -181,12 +181,6 @@ package object config {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("200ms")
 
-  private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
-    .doc("A comma-separated list of class names of services to add to the scheduler.")
-    .stringConf
-    .toSequence
-    .createWithDefault(Nil)
-
   private[spark] val AM_FINAL_MSG_LIMIT = ConfigBuilder("spark.yarn.am.finalMessageLimit")
     .doc("The limit size of final diagnostic message for our ApplicationMaster to unregister from" +
       " the ResourceManager.")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
deleted file mode 100644
index 7d15f0e..0000000
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * An extension service that can be loaded into a Spark YARN scheduler.
- * A Service that can be started and stopped.
- *
- * 1. For implementations to be loadable by `SchedulerExtensionServices`,
- * they must provide an empty constructor.
- * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
-trait SchedulerExtensionService {
-
-  /**
-   * Start the extension service. This should be a no-op if
-   * called more than once.
-   * @param binding binding to the spark application and YARN
-   */
-  def start(binding: SchedulerExtensionServiceBinding): Unit
-
-  /**
-   * Stop the service
-   * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
-   * never invoked.
-   */
-  def stop(): Unit
-}
-
-/**
- * Binding information for a [[SchedulerExtensionService]].
- *
- * The attempt ID will be set if the service is started within a YARN application master;
- * there is then a different attempt ID for every time that AM is restarted.
- * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
- * this information.
- * @param sparkContext current spark context
- * @param applicationId YARN application ID
- * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
- *                  cluster mode.
- */
-case class SchedulerExtensionServiceBinding(
-    sparkContext: SparkContext,
-    applicationId: ApplicationId,
-    attemptId: Option[ApplicationAttemptId] = None)
-
-/**
- * Container for [[SchedulerExtensionService]] instances.
- *
- * Loads Extension Services from the configuration property
- * `"spark.yarn.services"`, instantiates and starts them.
- * When stopped, it stops all child entries.
- *
- * The order in which child extension services are started and stopped
- * is undefined.
- */
-private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
-    with Logging {
-  private var serviceOption: Option[String] = None
-  private var services: List[SchedulerExtensionService] = Nil
-  private val started = new AtomicBoolean(false)
-  private var binding: SchedulerExtensionServiceBinding = _
-
-  /**
-   * Binding operation will load the named services and call bind on them too; the
-   * entire set of services are then ready for `init()` and `start()` calls.
-   *
-   * @param binding binding to the spark application and YARN
-   */
-  def start(binding: SchedulerExtensionServiceBinding): Unit = {
-    if (started.getAndSet(true)) {
-      logWarning("Ignoring re-entrant start operation")
-      return
-    }
-    require(binding.sparkContext != null, "Null context parameter")
-    require(binding.applicationId != null, "Null appId parameter")
-    this.binding = binding
-    val sparkContext = binding.sparkContext
-    val appId = binding.applicationId
-    val attemptId = binding.attemptId
-    logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
-
-    services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
-      val instance = Utils.classForName(sClass)
-        .getConstructor().newInstance()
-        .asInstanceOf[SchedulerExtensionService]
-      // bind this service
-      instance.start(binding)
-      logInfo(s"Service $sClass started")
-      instance
-    }.toList
-  }
-
-  /**
-   * Get the list of services.
-   *
-   * @return a list of services; Nil until the service is started
-   */
-  def getServices: List[SchedulerExtensionService] = services
-
-  /**
-   * Stop the services; idempotent.
-   *
-   */
-  override def stop(): Unit = {
-    if (started.getAndSet(false)) {
-      logInfo(s"Stopping $this")
-      services.foreach { s =>
-        Utils.tryLogNonFatalError(s.stop())
-      }
-    }
-  }
-
-  override def toString(): String = s"""SchedulerExtensionServices
-    |(serviceOption=$serviceOption,
-    | services=$services,
-    | started=$started)""".stripMargin
-}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 78cd6a2..a8472b4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -73,10 +73,6 @@ private[spark] abstract class YarnSchedulerBackend(
   /** Attempt ID. This is unset for client-mode schedulers */
   private var attemptId: Option[ApplicationAttemptId] = None
 
-  /** Scheduler extension services. */
-  private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
-
-
   /**
    * Bind to YARN. This *must* be done before calling [[start()]].
    *
@@ -90,8 +86,6 @@ private[spark] abstract class YarnSchedulerBackend(
 
   protected def startBindings(): Unit = {
     require(appId.isDefined, "application ID unset")
-    val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
-    services.start(binding)
   }
 
   override def stop(): Unit = {
@@ -102,7 +96,6 @@ private[spark] abstract class YarnSchedulerBackend(
       super.stop()
     } finally {
       stopped.set(true)
-      services.stop()
     }
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
deleted file mode 100644
index 6ea7984..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-
-/**
- * Test the integration with [[SchedulerExtensionServices]]
- */
-class ExtensionServiceIntegrationSuite extends SparkFunSuite
-  with LocalSparkContext with BeforeAndAfter
-  with Logging {
-
-  val applicationId = new StubApplicationId(0, 1111L)
-  val attemptId = new StubApplicationAttemptId(applicationId, 1)
-
-  /*
-   * Setup phase creates the spark context
-   */
-  before {
-    val sparkConf = new SparkConf()
-    sparkConf.set(SCHEDULER_SERVICES, Seq(classOf[SimpleExtensionService].getName()))
-    sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
-    sc = new SparkContext(sparkConf)
-  }
-
-  test("Instantiate") {
-    val services = new SchedulerExtensionServices()
-    assertResult(Nil, "non-nil service list") {
-      services.getServices
-    }
-    services.start(SchedulerExtensionServiceBinding(sc, applicationId))
-    services.stop()
-  }
-
-  test("Contains SimpleExtensionService Service") {
-    val services = new SchedulerExtensionServices()
-    try {
-      services.start(SchedulerExtensionServiceBinding(sc, applicationId))
-      val serviceList = services.getServices
-      assert(serviceList.nonEmpty, "empty service list")
-      val (service :: Nil) = serviceList
-      val simpleService = service.asInstanceOf[SimpleExtensionService]
-      assert(simpleService.started.get, "service not started")
-      services.stop()
-      assert(!simpleService.started.get, "service not stopped")
-    } finally {
-      services.stop()
-    }
-  }
-}
-
-
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
deleted file mode 100644
index 9b8c98cd..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-private[spark] class SimpleExtensionService extends SchedulerExtensionService {
-
-  /** started flag; set in the `start()` call, stopped in `stop()`. */
-  val started = new AtomicBoolean(false)
-
-  override def start(binding: SchedulerExtensionServiceBinding): Unit = {
-    started.set(true)
-  }
-
-  override def stop(): Unit = {
-    started.set(false)
-  }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
deleted file mode 100644
index 4b57b95..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-/**
- * A stub application ID; can be set in constructor and/or updated later.
- * @param applicationId application ID
- * @param attempt an attempt counter
- */
-class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: Int)
-    extends ApplicationAttemptId {
-
-  override def setApplicationId(appID: ApplicationId): Unit = {
-    applicationId = appID
-  }
-
-  override def getAttemptId: Int = {
-    attempt
-  }
-
-  override def setAttemptId(attemptId: Int): Unit = {
-    attempt = attemptId
-  }
-
-  override def getApplicationId: ApplicationId = {
-    applicationId
-  }
-
-  override def build(): Unit = {
-  }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
deleted file mode 100644
index bffa0e0..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.ApplicationId
-
-/**
- * Simple Testing Application Id; ID and cluster timestamp are set in constructor
- * and cannot be updated.
- * @param id app id
- * @param clusterTimestamp timestamp
- */
-private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) extends ApplicationId {
-  override def getId: Int = {
-    id
-  }
-
-  override def getClusterTimestamp: Long = {
-    clusterTimestamp
-  }
-
-  override def setId(id: Int): Unit = {}
-
-  override def setClusterTimestamp(clusterTimestamp: Long): Unit = {}
-
-  override def build(): Unit = {}
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org