You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/31 16:07:53 UTC

git commit: SAMZA-60; adding container and task name system properties to run-class.sh.

Updated Branches:
  refs/heads/master f58e09d60 -> 7375e3234


SAMZA-60; adding container and task name system properties to run-class.sh.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/7375e323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/7375e323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/7375e323

Branch: refs/heads/master
Commit: 7375e32347b4cd151fb34f2e2c636f853444d611
Parents: f58e09d
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Thu Oct 31 08:07:37 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Thu Oct 31 08:07:37 2013 -0700

----------------------------------------------------------------------
 docs/learn/documentation/0.7.0/jobs/logging.md  | 24 +++++++++++++++++---
 .../samza/config/ShellCommandConfig.scala       |  8 +++----
 .../apache/samza/container/SamzaContainer.scala |  2 +-
 .../apache/samza/job/ShellCommandBuilder.scala  |  2 +-
 samza-shell/src/main/bash/run-class.sh          |  2 +-
 .../org/apache/samza/config/YarnConfig.scala    |  5 ----
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  3 ++-
 .../job/yarn/SamzaAppMasterTaskManager.scala    |  2 +-
 .../org/apache/samza/job/yarn/YarnJob.scala     |  3 ++-
 9 files changed, 33 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/docs/learn/documentation/0.7.0/jobs/logging.md
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/0.7.0/jobs/logging.md b/docs/learn/documentation/0.7.0/jobs/logging.md
index 9ef9ca1..6bb6bf4 100644
--- a/docs/learn/documentation/0.7.0/jobs/logging.md
+++ b/docs/learn/documentation/0.7.0/jobs/logging.md
@@ -3,7 +3,7 @@ layout: page
 title: Logging
 ---
 
-Samza uses [SLF4J](http://www.slf4j.org/) for all of its logging. By default, only slf4j-api is used, so you must add an SLF4J runtime dependency to your Samza packages for whichever underlying logging platform you wish to use.
+Samza uses [SLF4J](http://www.slf4j.org/) for all of its logging. By default, Samza only depends on slf4j-api, so you must add an SLF4J runtime dependency to your Samza packages for whichever underlying logging platform you wish to use.
 
 ### Log4j
 
@@ -24,13 +24,31 @@ Samza's [run-class.sh](packaging.html) script will automatically set the followi
 
     -Dlog4j.configuration=file:$base_dir/lib/log4j.xml
 
+The [run-class.sh](packaging.html) script will also set the following Java system properties:
+
+    -Dsamza.log.dir=$SAMZA_LOG_DIR -Dsamza.container.name=$SAMZA_CONTAINER_NAME
+
+These settings are very useful if you're using a file-based appender. For example, you can use a daily rolling appender by configuring log4j.xml like this:
+
+```
+<appender name="RollingAppender" class="org.apache.log4j.DailyRollingFileAppender">
+   <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
+   <param name="DatePattern" value="'.'yyyy-MM-dd" />
+   <layout class="org.apache.log4j.PatternLayout">
+    <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n" />
+   </layout>
+</appender>
+```
+
+Setting up a file-based appender is recommended as a better alternative to using standard out. Standard out log files (see below) don't roll, and can get quite large if used for logging.
+
 <!-- TODO add notes showing how to use task.opts for gc logging
 #### task.opts
 -->
 
 ### Log Directory
 
-Samza will look for the _SAMZA_\__LOG_\__DIR_ environment variable when it executes. If this variable is defined, all logs will be written to this directory. If the environment variable is empty, or not defined, then Samza will use /tmp. This environment variable can also be referenced inside log4j.xml files.
+Samza will look for the _SAMZA_\__LOG_\__DIR_ environment variable when it executes. If this variable is defined, all logs will be written to this directory. If the environment variable is empty, or not defined, then Samza will use /tmp. This environment variable can also be referenced inside log4j.xml files (see above).
 
 ### Garbage Collection Logging
 
@@ -48,6 +66,6 @@ When a Samza job executes on a YARN grid, the _$SAMZA_\__LOG_\__DIR_ environment
 
 #### STDOUT
 
-YARN pipes all STDOUT and STDERR output to logs/stdout and logs/stderr, respectively. These files are never rotated.
+Samza's [ApplicationMaster](../yarn/application-master.html) pipes all STDOUT and STDERR output to logs/stdout and logs/stderr, respectively. These files are never rotated.
 
 ## [Application Master &raquo;](../yarn/application-master.html)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
index fd8dab8..27ba5e4 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/ShellCommandConfig.scala
@@ -23,17 +23,17 @@ object ShellCommandConfig {
   /**
    * This environment variable is used to store a JSON serialized map of all configuration.
    */
-  val ENV_CONFIG = "STREAMING_CONFIG"
+  val ENV_CONFIG = "SAMZA_CONFIG"
 
   /**
    * A CSV list of partition IDs that a TaskRunner is responsible for (e.g. 0,2,4,6).
    */
-  val ENV_PARTITION_IDS = "PARTITION_IDS"
+  val ENV_PARTITION_IDS = "SAMZA_PARTITION_IDS"
 
   /**
-   * A name for a TaskRunner.
+   * The name for a container (either a YARN AM or SamzaContainer)
    */
-  val ENV_TASK_NAME = "TASK_NAME"
+  val ENV_CONTAINER_NAME = "SAMZA_CONTAINER_NAME"
 
   /**
    * Arguments to be passed to the processing running the TaskRunner (or equivalent, for non JVM languages).

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 6337a2a..aefec27 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -65,7 +65,7 @@ import org.apache.samza.system.chooser.RoundRobinChooserFactory
 object SamzaContainer extends Logging {
   def main(args: Array[String]) {
     val jmxServer = new JmxServer
-    val containerName = System.getenv(ShellCommandConfig.ENV_TASK_NAME)
+    val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
     val configStr = System.getenv(ShellCommandConfig.ENV_CONFIG)
     val config = JsonConfigSerializer.fromJson(configStr)
     val partitionIdsCsv = System.getenv(ShellCommandConfig.ENV_PARTITION_IDS)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
index f55ca4c..09b2d22 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/ShellCommandBuilder.scala
@@ -31,7 +31,7 @@ class ShellCommandBuilder extends CommandBuilder {
     val parts = if (partitions.size() > 0) partitions.map(_.getPartitionId.toString).reduceLeft(_ + "," + _) else ""
 
     Map(
-      ShellCommandConfig.ENV_TASK_NAME -> name,
+      ShellCommandConfig.ENV_CONTAINER_NAME -> name,
       ShellCommandConfig.ENV_PARTITION_IDS -> parts,
       ShellCommandConfig.ENV_CONFIG -> JsonConfigSerializer.toJson(config),
       ShellCommandConfig.ENV_SAMZA_OPTS -> config.getTaskOpts.getOrElse(""))

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-shell/src/main/bash/run-class.sh
----------------------------------------------------------------------
diff --git a/samza-shell/src/main/bash/run-class.sh b/samza-shell/src/main/bash/run-class.sh
index e91bf99..bda13fc 100755
--- a/samza-shell/src/main/bash/run-class.sh
+++ b/samza-shell/src/main/bash/run-class.sh
@@ -51,7 +51,7 @@ if [ -z "$SAMZA_LOG_DIR" ]; then
 fi
 
 if [ -z "$SAMZA_OPTS" ]; then
-  SAMZA_OPTS="-Xmx160M -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log"
+  SAMZA_OPTS="-Xmx160M -XX:+PrintGCDateStamps -Xloggc:$SAMZA_LOG_DIR/gc.log -Dsamza.log.dir=$SAMZA_LOG_DIR -Dsamza.container.name=$SAMZA_CONTAINER_NAME"
   if [ -f $base_dir/lib/log4j.xml ]; then
     SAMZA_OPTS="$SAMZA_OPTS -Dlog4j.configuration=file:$base_dir/lib/log4j.xml"
   fi

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
index 4779edb..6c3aa92 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/config/YarnConfig.scala
@@ -20,11 +20,6 @@
 package org.apache.samza.config
 
 object YarnConfig {
-  // environment variables
-  val ENV_CONFIG = "STREAMING_CONFIG"
-  val ENV_PARTITION_ID = "PARTITION_ID"
-  val ENV_CONTAINER_NAME = "CONTAINER_NAME"
-
   // yarn job config
   val PACKAGE_PATH = "yarn.package.path"
   val CONTAINER_MAX_MEMORY_MB = "yarn.container.memory.mb"

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 6d72d86..de6887d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -32,6 +32,7 @@ import org.apache.samza.config.YarnConfig._
 import org.apache.samza.job.yarn.SamzaAppMasterTaskManager._
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.samza.config.ShellCommandConfig
 
 /**
  * When YARN executes an application master, it needs a bash command to
@@ -57,7 +58,7 @@ object SamzaAppMaster extends Logging {
     info("got node manager port: %s" format nodePortString)
     val nodeHttpPortString = System.getenv(ApplicationConstants.Environment.NM_HTTP_PORT.toString)
     info("got node manager http port: %s" format nodeHttpPortString)
-    val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(YarnConfig.ENV_CONFIG)))
+    val config = new MapConfig(JsonConfigSerializer.fromJson(System.getenv(ShellCommandConfig.ENV_CONFIG)))
     info("got config: %s" format config)
     val hConfig = new YarnConfiguration
     hConfig.set("fs.http.impl", classOf[HttpFileSystem].getName)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
index 51097c1..80d2972 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterTaskManager.scala
@@ -131,7 +131,7 @@ class SamzaAppMasterTaskManager(clock: () => Long, config: Config, state: SamzaA
         val cmdBuilderClassName = config.getCommandClass.getOrElse(classOf[ShellCommandBuilder].getName)
         val cmdBuilder = Class.forName(cmdBuilderClassName).newInstance.asInstanceOf[CommandBuilder]
           .setConfig(config)
-          .setName("SamzaContainer-%s" format taskId)
+          .setName("samza-container-%s" format taskId)
           .setPartitions(partitionsForTask)
           .setTotalPartitions(partitions.size)
         val command = cmdBuilder.buildCommand

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/7375e323/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
index 3f03399..b79c0aa 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJob.scala
@@ -53,7 +53,8 @@ class YarnJob(config: Config, hadoopConfig: Configuration) extends StreamJob {
         "export SAMZA_LOG_DIR=%s && ln -sfn %s logs && exec ./__package/bin/run-am.sh 1>logs/%s 2>logs/%s"
           format (ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.LOG_DIR_EXPANSION_VAR, ApplicationConstants.STDOUT, ApplicationConstants.STDERR)),
       Some(Map(
-        YarnConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),
+        ShellCommandConfig.ENV_CONFIG -> Util.envVarEscape(JsonConfigSerializer.toJson(config)),
+        ShellCommandConfig.ENV_CONTAINER_NAME -> Util.envVarEscape("application-master"),
         ShellCommandConfig.ENV_SAMZA_OPTS -> Util.envVarEscape(config.getAmOpts.getOrElse("")))),
       Some("%s_%s" format (config.getName.get, config.getJobId.getOrElse(1))))