You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/06/08 21:27:46 UTC

git commit: SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl

Repository: spark
Updated Branches:
  refs/heads/master a338834f9 -> ee96e9406


SPARK-1898: In deploy.yarn.Client, use YarnClient not YarnClientImpl

https://issues.apache.org/jira/browse/SPARK-1898

Author: Colin Patrick McCabe <cm...@cloudera.com>

Closes #850 from cmccabe/master and squashes the following commits:

d66eddc [Colin Patrick McCabe] SPARK-1898: In deploy.yarn.Client, use YarnClient rather than YarnClientImpl


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee96e940
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee96e940
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee96e940

Branch: refs/heads/master
Commit: ee96e9406613e621837360b15c34ea7c7220a7a3
Parents: a338834
Author: Colin Patrick McCabe <cm...@cloudera.com>
Authored: Sun Jun 8 12:27:34 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Jun 8 12:27:34 2014 -0700

----------------------------------------------------------------------
 .../cluster/YarnClientSchedulerBackend.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/Client.scala   | 25 +++++++++++++-------
 2 files changed, 17 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee96e940/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index e01ed5a..039cf4f 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -112,7 +112,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   override def stop() {
     super.stop()
-    client.stop()
+    client.stop
     logInfo("Stopped")
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ee96e940/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 1b6bfb4..393edd1 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
 import org.apache.hadoop.yarn.api.protocolrecords._
 import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl
+import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.ipc.YarnRPC
 import org.apache.hadoop.yarn.util.{Apps, Records}
@@ -37,7 +37,9 @@ import org.apache.spark.{Logging, SparkConf}
  * Version of [[org.apache.spark.deploy.yarn.ClientBase]] tailored to YARN's stable API.
  */
 class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: SparkConf)
-  extends YarnClientImpl with ClientBase with Logging {
+  extends ClientBase with Logging {
+
+  val yarnClient = YarnClient.createYarnClient
 
   def this(clientArgs: ClientArguments, spConf: SparkConf) =
     this(clientArgs, new Configuration(), spConf)
@@ -53,8 +55,8 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   def runApp(): ApplicationId = {
     validateArgs()
     // Initialize and start the client service.
-    init(yarnConf)
-    start()
+    yarnClient.init(yarnConf)
+    yarnClient.start()
 
     // Log details about this YARN cluster (e.g, the number of slave machines/NodeManagers).
     logClusterResourceDetails()
@@ -63,7 +65,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
     // interface).
 
     // Get a new client application.
-    val newApp = super.createApplication()
+    val newApp = yarnClient.createApplication()
     val newAppResponse = newApp.getNewApplicationResponse()
     val appId = newAppResponse.getApplicationId()
 
@@ -99,11 +101,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   }
 
   def logClusterResourceDetails() {
-    val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics
+    val clusterMetrics: YarnClusterMetrics = yarnClient.getYarnClusterMetrics
     logInfo("Got Cluster metric info from ApplicationsManager (ASM), number of NodeManagers: " +
       clusterMetrics.getNumNodeManagers)
 
-    val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue)
+    val queueInfo: QueueInfo = yarnClient.getQueueInfo(args.amQueue)
     logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s,
       queueApplicationCount = %s, queueChildQueueCount = %s""".format(
         queueInfo.getQueueName,
@@ -132,15 +134,20 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa
   def submitApp(appContext: ApplicationSubmissionContext) = {
     // Submit the application to the applications manager.
     logInfo("Submitting application to ASM")
-    super.submitApplication(appContext)
+    yarnClient.submitApplication(appContext)
   }
 
+  def getApplicationReport(appId: ApplicationId) =
+      yarnClient.getApplicationReport(appId)
+
+  def stop = yarnClient.stop
+
   def monitorApplication(appId: ApplicationId): Boolean = {
     val interval = sparkConf.getLong("spark.yarn.report.interval", 1000)
 
     while (true) {
       Thread.sleep(interval)
-      val report = super.getApplicationReport(appId)
+      val report = yarnClient.getApplicationReport(appId)
 
       logInfo("Application report from ASM: \n" +
         "\t application identifier: " + appId.toString() + "\n" +