You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/03 08:16:31 UTC

[19/32] git commit: merge yarn/scheduler yarn/common code into one directory

merge yarn/scheduler yarn/common code into one directory


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c5422e02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c5422e02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c5422e02

Branch: refs/heads/master
Commit: c5422e02b868dd69d1078c246ba15e4a02a7b8b9
Parents: ad60710
Author: Raymond Liu <ra...@intel.com>
Authored: Mon Dec 23 10:33:33 2013 +0800
Committer: Raymond Liu <ra...@intel.com>
Committed: Fri Jan 3 12:14:37 2014 +0800

----------------------------------------------------------------------
 .../cluster/YarnClientClusterScheduler.scala    |  48 ++++++++
 .../cluster/YarnClientSchedulerBackend.scala    | 110 +++++++++++++++++++
 .../cluster/YarnClusterScheduler.scala          |  56 ++++++++++
 .../cluster/YarnClientClusterScheduler.scala    |  48 --------
 .../cluster/YarnClientSchedulerBackend.scala    | 110 -------------------
 .../cluster/YarnClusterScheduler.scala          |  56 ----------
 6 files changed, 214 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
new file mode 100644
index 0000000..522e0a9
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.hadoop.conf.Configuration
+import org.apache.spark.deploy.yarn.YarnAllocationHandler
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+
+/**
+ *
+ * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
+ */
+private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+
+  def this(sc: SparkContext) = this(sc, new Configuration())
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    val retval = YarnAllocationHandler.lookupRack(conf, host)
+    if (retval != null) Some(retval) else None
+  }
+
+  override def postStartHook() {
+
+    // The yarn application is running, but the worker might not yet ready
+    // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+    Thread.sleep(2000L)
+    logInfo("YarnClientClusterScheduler.postStartHook done")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
new file mode 100644
index 0000000..4b69f50
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.spark.{SparkException, Logging, SparkContext}
+import org.apache.spark.deploy.yarn.{Client, ClientArguments}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
+private[spark] class YarnClientSchedulerBackend(
+    scheduler: TaskSchedulerImpl,
+    sc: SparkContext)
+  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+  with Logging {
+
+  var client: Client = null
+  var appId: ApplicationId = null
+
+  override def start() {
+    super.start()
+
+    val defalutWorkerCores = "2"
+    val defalutWorkerMemory = "512m"
+    val defaultWorkerNumber = "1"
+
+    val userJar = System.getenv("SPARK_YARN_APP_JAR")
+    var workerCores = System.getenv("SPARK_WORKER_CORES")
+    var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
+    var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
+
+    if (userJar == null)
+      throw new SparkException("env SPARK_YARN_APP_JAR is not set")
+
+    if (workerCores == null)
+      workerCores = defalutWorkerCores
+    if (workerMemory == null)
+      workerMemory = defalutWorkerMemory
+    if (workerNumber == null)
+      workerNumber = defaultWorkerNumber
+
+    val driverHost = conf.get("spark.driver.host")
+    val driverPort = conf.get("spark.driver.port")
+    val hostport = driverHost + ":" + driverPort
+
+    val argsArray = Array[String](
+      "--class", "notused",
+      "--jar", userJar,
+      "--args", hostport,
+      "--worker-memory", workerMemory,
+      "--worker-cores", workerCores,
+      "--num-workers", workerNumber,
+      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
+    )
+
+    val args = new ClientArguments(argsArray)
+    client = new Client(args)
+    appId = client.runApp()
+    waitForApp()
+  }
+
+  def waitForApp() {
+
+    // TODO : need a better way to find out whether the workers are ready or not
+    // maybe by resource usage report?
+    while(true) {
+      val report = client.getApplicationReport(appId)
+
+      logInfo("Application report from ASM: \n" +
+        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
+        "\t appStartTime: " + report.getStartTime() + "\n" +
+        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
+      )
+
+      // Ready to go, or already gone.
+      val state = report.getYarnApplicationState()
+      if (state == YarnApplicationState.RUNNING) {
+        return
+      } else if (state == YarnApplicationState.FINISHED ||
+        state == YarnApplicationState.FAILED ||
+        state == YarnApplicationState.KILLED) {
+        throw new SparkException("Yarn application already ended," +
+          "might be killed or not able to launch application master.")
+      }
+
+      Thread.sleep(1000)
+    }
+  }
+
+  override def stop() {
+    super.stop()
+    client.stop()
+    logInfo("Stoped")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
new file mode 100644
index 0000000..a4638cc
--- /dev/null
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.spark._
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
+import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.util.Utils
+import org.apache.hadoop.conf.Configuration
+
+/**
+ *
+ * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
+ */
+private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
+
+  logInfo("Created YarnClusterScheduler")
+
+  def this(sc: SparkContext) = this(sc, new Configuration())
+
+  // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
+  // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
+  // Subsequent creations are ignored - since nodes are already allocated by then.
+
+
+  // By default, rack is unknown
+  override def getRackForHost(hostPort: String): Option[String] = {
+    val host = Utils.parseHostPort(hostPort)._1
+    val retval = YarnAllocationHandler.lookupRack(conf, host)
+    if (retval != null) Some(retval) else None
+  }
+
+  override def postStartHook() {
+    val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+    if (sparkContextInitialized){
+      // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
+      Thread.sleep(3000L)
+    }
+    logInfo("YarnClusterScheduler.postStartHook done")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
deleted file mode 100644
index 522e0a9..0000000
--- a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.hadoop.conf.Configuration
-import org.apache.spark.deploy.yarn.YarnAllocationHandler
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-
-/**
- *
- * This scheduler launch worker through Yarn - by call into Client to launch WorkerLauncher as AM.
- */
-private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
-  def this(sc: SparkContext) = this(sc, new Configuration())
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    val retval = YarnAllocationHandler.lookupRack(conf, host)
-    if (retval != null) Some(retval) else None
-  }
-
-  override def postStartHook() {
-
-    // The yarn application is running, but the worker might not yet ready
-    // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-    Thread.sleep(2000L)
-    logInfo("YarnClientClusterScheduler.postStartHook done")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
deleted file mode 100644
index 4b69f50..0000000
--- a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
-import org.apache.spark.{SparkException, Logging, SparkContext}
-import org.apache.spark.deploy.yarn.{Client, ClientArguments}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-
-private[spark] class YarnClientSchedulerBackend(
-    scheduler: TaskSchedulerImpl,
-    sc: SparkContext)
-  extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
-  with Logging {
-
-  var client: Client = null
-  var appId: ApplicationId = null
-
-  override def start() {
-    super.start()
-
-    val defalutWorkerCores = "2"
-    val defalutWorkerMemory = "512m"
-    val defaultWorkerNumber = "1"
-
-    val userJar = System.getenv("SPARK_YARN_APP_JAR")
-    var workerCores = System.getenv("SPARK_WORKER_CORES")
-    var workerMemory = System.getenv("SPARK_WORKER_MEMORY")
-    var workerNumber = System.getenv("SPARK_WORKER_INSTANCES")
-
-    if (userJar == null)
-      throw new SparkException("env SPARK_YARN_APP_JAR is not set")
-
-    if (workerCores == null)
-      workerCores = defalutWorkerCores
-    if (workerMemory == null)
-      workerMemory = defalutWorkerMemory
-    if (workerNumber == null)
-      workerNumber = defaultWorkerNumber
-
-    val driverHost = conf.get("spark.driver.host")
-    val driverPort = conf.get("spark.driver.port")
-    val hostport = driverHost + ":" + driverPort
-
-    val argsArray = Array[String](
-      "--class", "notused",
-      "--jar", userJar,
-      "--args", hostport,
-      "--worker-memory", workerMemory,
-      "--worker-cores", workerCores,
-      "--num-workers", workerNumber,
-      "--master-class", "org.apache.spark.deploy.yarn.WorkerLauncher"
-    )
-
-    val args = new ClientArguments(argsArray)
-    client = new Client(args)
-    appId = client.runApp()
-    waitForApp()
-  }
-
-  def waitForApp() {
-
-    // TODO : need a better way to find out whether the workers are ready or not
-    // maybe by resource usage report?
-    while(true) {
-      val report = client.getApplicationReport(appId)
-
-      logInfo("Application report from ASM: \n" +
-        "\t appMasterRpcPort: " + report.getRpcPort() + "\n" +
-        "\t appStartTime: " + report.getStartTime() + "\n" +
-        "\t yarnAppState: " + report.getYarnApplicationState() + "\n"
-      )
-
-      // Ready to go, or already gone.
-      val state = report.getYarnApplicationState()
-      if (state == YarnApplicationState.RUNNING) {
-        return
-      } else if (state == YarnApplicationState.FINISHED ||
-        state == YarnApplicationState.FAILED ||
-        state == YarnApplicationState.KILLED) {
-        throw new SparkException("Yarn application already ended," +
-          "might be killed or not able to launch application master.")
-      }
-
-      Thread.sleep(1000)
-    }
-  }
-
-  override def stop() {
-    super.stop()
-    client.stop()
-    logInfo("Stoped")
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c5422e02/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
deleted file mode 100644
index a4638cc..0000000
--- a/yarn/scheduler/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster
-
-import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnAllocationHandler}
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.util.Utils
-import org.apache.hadoop.conf.Configuration
-
-/**
- *
- * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of ApplicationMaster, etc is done
- */
-private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) extends TaskSchedulerImpl(sc) {
-
-  logInfo("Created YarnClusterScheduler")
-
-  def this(sc: SparkContext) = this(sc, new Configuration())
-
-  // Nothing else for now ... initialize application master : which needs sparkContext to determine how to allocate
-  // Note that only the first creation of SparkContext influences (and ideally, there must be only one SparkContext, right ?)
-  // Subsequent creations are ignored - since nodes are already allocated by then.
-
-
-  // By default, rack is unknown
-  override def getRackForHost(hostPort: String): Option[String] = {
-    val host = Utils.parseHostPort(hostPort)._1
-    val retval = YarnAllocationHandler.lookupRack(conf, host)
-    if (retval != null) Some(retval) else None
-  }
-
-  override def postStartHook() {
-    val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
-    if (sparkContextInitialized){
-      // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-      Thread.sleep(3000L)
-    }
-    logInfo("YarnClusterScheduler.postStartHook done")
-  }
-}