You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:44 UTC
[28/50] [abbrv] samza git commit: SAMZA-767 - yarn.queue option is
not used anywhere
SAMZA-767 - yarn.queue option is not used anywhere
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/e8a2ef5e
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/e8a2ef5e
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/e8a2ef5e
Branch: refs/heads/samza-sql
Commit: e8a2ef5efac39f56120c19abb50d84433df91b02
Parents: 092e381
Author: Aleksandar Pejakovic <a....@levi9.com>
Authored: Thu Nov 19 14:52:42 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Thu Nov 19 14:52:42 2015 -0800
----------------------------------------------------------------------
.../src/main/java/org/apache/samza/config/YarnConfig.java | 8 ++++++++
.../scala/org/apache/samza/job/yarn/ClientHelper.scala | 10 +++++++++-
.../main/scala/org/apache/samza/job/yarn/YarnJob.scala | 4 ++--
3 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
index a572aa2..c556d83 100644
--- a/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
+++ b/samza-yarn/src/main/java/org/apache/samza/config/YarnConfig.java
@@ -35,6 +35,11 @@ public class YarnConfig extends MapConfig {
private static final int DEFAULT_CONTAINER_MEM = 1024;
/**
+ * Name of YARN queue to run jobs on
+ */
+ public static final String QUEUE_NAME = "yarn.queue";
+
+ /**
* Number of CPU cores to request from YARN per container
*/
public static final String CONTAINER_MAX_CPU_CORES = "yarn.container.cpu.cores";
@@ -144,6 +149,9 @@ public class YarnConfig extends MapConfig {
return get(AM_JVM_OPTIONS, "");
}
+ public String getQueueName() {
+ return get(QUEUE_NAME, null);
+ }
public String getAMJavaHome() {
return get(AM_JAVA_HOME, null);
http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index a2b9279..74a0676 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -66,7 +66,7 @@ class ClientHelper(conf: Configuration) extends Logging {
/**
* Generate an application and submit it to the resource manager to start an application master
*/
- def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String]): Option[ApplicationId] = {
+ def submitApplication(packagePath: Path, memoryMb: Int, cpuCore: Int, cmds: List[String], env: Option[Map[String, String]], name: Option[String], queueName: Option[String]): Option[ApplicationId] = {
val app = yarnClient.createApplication
val newAppResponse = app.getNewApplicationResponse
var mem = memoryMb
@@ -106,6 +106,14 @@ class ClientHelper(conf: Configuration) extends Logging {
case None => None
}
+ queueName match {
+ case Some(queueName) => {
+ appCtx.setQueue(queueName)
+ info("set yarn queue name to %s" format queueName)
+ }
+ case None => None
+ }
+
// set the local package so that the containers and app master are provisioned with it
val packageUrl = ConverterUtils.getYarnUrlFromPath(packagePath)
val fileStatus = packagePath.getFileSystem(conf).getFileStatus(packagePath)
http://git-wip-us.apache.org/repos/asf/samza/blob/e8a2ef5e/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 02f46a1..1aa26bb 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -73,8 +73,8 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
}
envMapWithJavaHome
}),
- Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1)))
- )
+ Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))),
+ Option(yarnConfig.getQueueName))
this
}