You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/25 19:57:56 UTC
[spark] branch master updated: [SPARK-26788][YARN] Remove
SchedulerExtensionService.
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4808393 [SPARK-26788][YARN] Remove SchedulerExtensionService.
4808393 is described below
commit 4808393449ccad2c6bc73c91d0ed8dd8f60c7054
Author: Marcelo Vanzin <va...@cloudera.com>
AuthorDate: Mon Feb 25 13:57:37 2019 -0600
[SPARK-26788][YARN] Remove SchedulerExtensionService.
Since the yarn module is actually private to Spark, this interface was never
actually "public". Since it has no use inside of Spark, let's avoid adding
a yarn-specific extension that isn't public, and point any potential users
are more general solutions (like using a SparkListener).
Closes #23839 from vanzin/SPARK-26788.
Authored-by: Marcelo Vanzin <va...@cloudera.com>
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../main/scala/org/apache/spark/SparkConf.scala | 3 +-
.../org/apache/spark/deploy/yarn/config.scala | 6 -
.../cluster/SchedulerExtensionService.scala | 143 ---------------------
.../scheduler/cluster/YarnSchedulerBackend.scala | 7 -
.../cluster/ExtensionServiceIntegrationSuite.scala | 72 -----------
.../scheduler/cluster/SimpleExtensionService.scala | 34 -----
.../cluster/StubApplicationAttemptId.scala | 48 -------
.../scheduler/cluster/StubApplicationId.scala | 42 ------
8 files changed, 2 insertions(+), 353 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index e686e07..529ca3f 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -638,7 +638,8 @@ private[spark] object SparkConf extends Logging {
DeprecatedConfig("spark.shuffle.service.index.cache.entries", "2.3.0",
"Not used anymore. Please use spark.shuffle.service.index.cache.size"),
DeprecatedConfig("spark.yarn.credentials.file.retention.count", "2.4.0", "Not used anymore."),
- DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore.")
+ DeprecatedConfig("spark.yarn.credentials.file.retention.days", "2.4.0", "Not used anymore."),
+ DeprecatedConfig("spark.yarn.services", "3.0.0", "Feature no longer available.")
)
Map(configs.map { cfg => (cfg.key -> cfg) } : _*)
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index bd5e136..5f8739b 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -181,12 +181,6 @@ package object config {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("200ms")
- private[spark] val SCHEDULER_SERVICES = ConfigBuilder("spark.yarn.services")
- .doc("A comma-separated list of class names of services to add to the scheduler.")
- .stringConf
- .toSequence
- .createWithDefault(Nil)
-
private[spark] val AM_FINAL_MSG_LIMIT = ConfigBuilder("spark.yarn.am.finalMessageLimit")
.doc("The limit size of final diagnostic message for our ApplicationMaster to unregister from" +
" the ResourceManager.")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
deleted file mode 100644
index 7d15f0e..0000000
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-import org.apache.spark.util.Utils
-
-/**
- * An extension service that can be loaded into a Spark YARN scheduler.
- * A Service that can be started and stopped.
- *
- * 1. For implementations to be loadable by `SchedulerExtensionServices`,
- * they must provide an empty constructor.
- * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
-trait SchedulerExtensionService {
-
- /**
- * Start the extension service. This should be a no-op if
- * called more than once.
- * @param binding binding to the spark application and YARN
- */
- def start(binding: SchedulerExtensionServiceBinding): Unit
-
- /**
- * Stop the service
- * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
- * never invoked.
- */
- def stop(): Unit
-}
-
-/**
- * Binding information for a [[SchedulerExtensionService]].
- *
- * The attempt ID will be set if the service is started within a YARN application master;
- * there is then a different attempt ID for every time that AM is restarted.
- * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
- * this information.
- * @param sparkContext current spark context
- * @param applicationId YARN application ID
- * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
- * cluster mode.
- */
-case class SchedulerExtensionServiceBinding(
- sparkContext: SparkContext,
- applicationId: ApplicationId,
- attemptId: Option[ApplicationAttemptId] = None)
-
-/**
- * Container for [[SchedulerExtensionService]] instances.
- *
- * Loads Extension Services from the configuration property
- * `"spark.yarn.services"`, instantiates and starts them.
- * When stopped, it stops all child entries.
- *
- * The order in which child extension services are started and stopped
- * is undefined.
- */
-private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
- with Logging {
- private var serviceOption: Option[String] = None
- private var services: List[SchedulerExtensionService] = Nil
- private val started = new AtomicBoolean(false)
- private var binding: SchedulerExtensionServiceBinding = _
-
- /**
- * Binding operation will load the named services and call bind on them too; the
- * entire set of services are then ready for `init()` and `start()` calls.
- *
- * @param binding binding to the spark application and YARN
- */
- def start(binding: SchedulerExtensionServiceBinding): Unit = {
- if (started.getAndSet(true)) {
- logWarning("Ignoring re-entrant start operation")
- return
- }
- require(binding.sparkContext != null, "Null context parameter")
- require(binding.applicationId != null, "Null appId parameter")
- this.binding = binding
- val sparkContext = binding.sparkContext
- val appId = binding.applicationId
- val attemptId = binding.attemptId
- logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
-
- services = sparkContext.conf.get(SCHEDULER_SERVICES).map { sClass =>
- val instance = Utils.classForName(sClass)
- .getConstructor().newInstance()
- .asInstanceOf[SchedulerExtensionService]
- // bind this service
- instance.start(binding)
- logInfo(s"Service $sClass started")
- instance
- }.toList
- }
-
- /**
- * Get the list of services.
- *
- * @return a list of services; Nil until the service is started
- */
- def getServices: List[SchedulerExtensionService] = services
-
- /**
- * Stop the services; idempotent.
- *
- */
- override def stop(): Unit = {
- if (started.getAndSet(false)) {
- logInfo(s"Stopping $this")
- services.foreach { s =>
- Utils.tryLogNonFatalError(s.stop())
- }
- }
- }
-
- override def toString(): String = s"""SchedulerExtensionServices
- |(serviceOption=$serviceOption,
- | services=$services,
- | started=$started)""".stripMargin
-}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 78cd6a2..a8472b4 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -73,10 +73,6 @@ private[spark] abstract class YarnSchedulerBackend(
/** Attempt ID. This is unset for client-mode schedulers */
private var attemptId: Option[ApplicationAttemptId] = None
- /** Scheduler extension services. */
- private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
-
-
/**
* Bind to YARN. This *must* be done before calling [[start()]].
*
@@ -90,8 +86,6 @@ private[spark] abstract class YarnSchedulerBackend(
protected def startBindings(): Unit = {
require(appId.isDefined, "application ID unset")
- val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
- services.start(binding)
}
override def stop(): Unit = {
@@ -102,7 +96,6 @@ private[spark] abstract class YarnSchedulerBackend(
super.stop()
} finally {
stopped.set(true)
- services.stop()
}
}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
deleted file mode 100644
index 6ea7984..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.deploy.yarn.config._
-import org.apache.spark.internal.Logging
-
-/**
- * Test the integration with [[SchedulerExtensionServices]]
- */
-class ExtensionServiceIntegrationSuite extends SparkFunSuite
- with LocalSparkContext with BeforeAndAfter
- with Logging {
-
- val applicationId = new StubApplicationId(0, 1111L)
- val attemptId = new StubApplicationAttemptId(applicationId, 1)
-
- /*
- * Setup phase creates the spark context
- */
- before {
- val sparkConf = new SparkConf()
- sparkConf.set(SCHEDULER_SERVICES, Seq(classOf[SimpleExtensionService].getName()))
- sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
- sc = new SparkContext(sparkConf)
- }
-
- test("Instantiate") {
- val services = new SchedulerExtensionServices()
- assertResult(Nil, "non-nil service list") {
- services.getServices
- }
- services.start(SchedulerExtensionServiceBinding(sc, applicationId))
- services.stop()
- }
-
- test("Contains SimpleExtensionService Service") {
- val services = new SchedulerExtensionServices()
- try {
- services.start(SchedulerExtensionServiceBinding(sc, applicationId))
- val serviceList = services.getServices
- assert(serviceList.nonEmpty, "empty service list")
- val (service :: Nil) = serviceList
- val simpleService = service.asInstanceOf[SimpleExtensionService]
- assert(simpleService.started.get, "service not started")
- services.stop()
- assert(!simpleService.started.get, "service not stopped")
- } finally {
- services.stop()
- }
- }
-}
-
-
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
deleted file mode 100644
index 9b8c98cd..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import java.util.concurrent.atomic.AtomicBoolean
-
-private[spark] class SimpleExtensionService extends SchedulerExtensionService {
-
- /** started flag; set in the `start()` call, stopped in `stop()`. */
- val started = new AtomicBoolean(false)
-
- override def start(binding: SchedulerExtensionServiceBinding): Unit = {
- started.set(true)
- }
-
- override def stop(): Unit = {
- started.set(false)
- }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
deleted file mode 100644
index 4b57b95..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
-
-/**
- * A stub application ID; can be set in constructor and/or updated later.
- * @param applicationId application ID
- * @param attempt an attempt counter
- */
-class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: Int)
- extends ApplicationAttemptId {
-
- override def setApplicationId(appID: ApplicationId): Unit = {
- applicationId = appID
- }
-
- override def getAttemptId: Int = {
- attempt
- }
-
- override def setAttemptId(attemptId: Int): Unit = {
- attempt = attemptId
- }
-
- override def getApplicationId: ApplicationId = {
- applicationId
- }
-
- override def build(): Unit = {
- }
-}
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
deleted file mode 100644
index bffa0e0..0000000
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.ApplicationId
-
-/**
- * Simple Testing Application Id; ID and cluster timestamp are set in constructor
- * and cannot be updated.
- * @param id app id
- * @param clusterTimestamp timestamp
- */
-private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) extends ApplicationId {
- override def getId: Int = {
- id
- }
-
- override def getClusterTimestamp: Long = {
- clusterTimestamp
- }
-
- override def setId(id: Int): Unit = {}
-
- override def setClusterTimestamp(clusterTimestamp: Long): Unit = {}
-
- override def build(): Unit = {}
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org