You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:44 UTC

[28/50] [abbrv] samza git commit: SAMZA-767 - yarn.queue option is not used anywhere

SAMZA-767 - yarn.queue option is not used anywhere


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e8a2ef5e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e8a2ef5e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e8a2ef5e

Branch: refs/heads/samza-sql
Commit: e8a2ef5efac39f56120c19abb50d84433df91b02
Parents: 092e381
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Thu Nov 19 14:52:42 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Thu Nov 19 14:52:42 2015 -0800

----------------------------------------------------------------------
 .../src/main/java/org/apache/samza/config/YarnConfig.java |  8 ++++++++
 .../scala/org/apache/samza/job/yarn/ClientHelper.scala    | 10 +++++++++-
 .../main/scala/org/apache/samza/job/yarn/YarnJob.scala    |  4 ++--
 3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index a572aa2..c556d83 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -35,6 +35,11 @@ public class YarnConfig extends MapConfig {
   private static final int DEFAULT_CONTAINER_MEM = 1024;
 
   /**
+   * Name of YARN queue to run jobs on
+   */
+  public static final String QUEUE_NAME = "yarn.queue";
+
+  /**
    * Number of CPU cores to request from YARN per container
    */
   public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
@@ -144,6 +149,9 @@ public class YarnConfig extends MapConfig {
     return get(AM_JVM_OPTIONS, "");
   }
 
+  public String getQueueName() {
+    return get(QUEUE_NAME, null);
+  }
 
   public String getAMJavaHome() {
     return get(AM_JAVA_HOME, null);

http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index a2b9279..74a0676 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -66,7 +66,7 @@ class ClientHelper(conf: Configuration) extends Logging {
   /**
    * Generate an application and submit it to the resource manager to start an application master
    */
-  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+  def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String], queueName: Option[String]): Option[ApplicationId] = {
     val app = yarnClient.createApplication
     val newAppResponse = app.getNewApplicationResponse
     var mem = memoryMb
@@ -106,6 +106,14 @@ class ClientHelper(conf: Configuration) extends Logging {
       case None => None
     }
 
+    queueName match {
+      case Some(queueName) => {
+        appCtx.setQueue(queueName)
+        info("set yarn queue name to %s" format queueName)
+      }
+      case None => None
+    }
+
     // set the local package so that the containers and app master are provisioned with it
     val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
     val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)

http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 02f46a1..1aa26bb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -73,8 +73,8 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
         }
         envMapWithJavaHome
       }),
-      Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1)))
-    )
+      Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))),
+      Option(yarnConfig.getQueueName))
     this
   }