You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC

svn commit: r1457129 [2/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...

Added: incubator/tez/tez-ampool/src/main/bin/tez-config.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/bin/tez-config.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/bin/tez-config.sh (added)
+++ incubator/tez/tez-ampool/src/main/bin/tez-config.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# included in all the tez scripts with source command
+# should not be executable directly
+# also should not be passed any arguments, since we need original $*
+
+# Resolve links ($0 may be a softlink) and convert a relative path
+# to an absolute path.  NB: The -P option requires bash built-ins
+# or POSIX:2001 compliant cd and pwd.
+
+#   TEZ_CLASSPATH Extra Java CLASSPATH entries.
+#
+
+this="${BASH_SOURCE-$0}"
+tez_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
+script="$(basename -- "$this")"
+this="$tez_bin/$script"
+
+TEZ_DIR=${TEZ_DIR:-""}
+TEZ_LIB_DIR=${TEZ_LIB_DIR:-"lib"}
+
+TEZ_DEFAULT_HOME=$(cd -P -- "$tez_bin"/.. && pwd -P)
+TEZ_HOME=${TEZ_HOME:-$TEZ_DEFAULT_HOME}
+export TEZ_HOME
+
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+  if [ "--config" = "$1" ]
+  then
+    shift
+    confdir=$1
+    shift
+    TEZ_CONF_DIR=$confdir
+  fi
+fi
+
+export TEZ_CONF_DIR="${TEZ_CONF_DIR:-$TEZ_HOME/conf}"
+
+if [ -f "${TEZ_CONF_DIR}/tez-env.sh" ]; then
+  . "${TEZ_CONF_DIR}/tez-env.sh"
+fi
+
+# Newer versions of glibc use an arena memory allocator that causes virtual
+# memory usage to explode.
+export MALLOC_ARENA_MAX=${MALLOC_ARENA_MAX:-4}
+
+# Attempt to set JAVA_HOME if it is not set
+if [[ -z $JAVA_HOME ]]; then
+  # On OSX use java_home (or /Library for older versions)
+  if [ "Darwin" == "$(uname -s)" ]; then
+    if [ -x /usr/libexec/java_home ]; then
+      export JAVA_HOME=($(/usr/libexec/java_home))
+    else
+      export JAVA_HOME=(/Library/Java/Home)
+    fi
+  fi
+
+  # Bail if we did not detect it
+  if [[ -z $JAVA_HOME ]]; then
+    echo "Error: JAVA_HOME is not set and could not be found." 1>&2
+    exit 1
+  fi
+fi
+
+JAVA=$JAVA_HOME/bin/java
+# some Java parameters
+JAVA_HEAP_MAX=-Xmx1024m
+
+# check envvars which might override default args
+if [[ "$TEZ_HEAPSIZE" != "" ]]; then
+  #echo "run with heapsize $TEZ_HEAPSIZE"
+  JAVA_HEAP_MAX="-Xmx""$TEZ_HEAPSIZE""m"
+  #echo $JAVA_HEAP_MAX
+fi
+
+# CLASSPATH initially contains $TEZ_CONF_DIR
+CLASSPATH="${TEZ_CONF_DIR}"
+
+if [ "$TEZ_USER_CLASSPATH_FIRST" != "" ] && [ "$TEZ_CLASSPATH" != "" ] ; then
+  CLASSPATH=${CLASSPATH}:${TEZ_CLASSPATH}
+fi
+
+CLASSPATH=${CLASSPATH}:${TEZ_HOME}/${TEZ_DIR}'/*':${TEZ_HOME}/${TEZ_LIB_DIR}'/*'
+
+# so that filenames w/ spaces are handled correctly in loops below
+IFS=
+
+if [[ -z $HADOOP_HOME ]]; then
+  HADOOP_PATH=`which hadoop`
+  result=$?
+  if [ "$result" != "0" ]; then
+    echo 'Failed to find hadoop in $PATH.'
+    echo 'Please ensure that HADOOP_HOME is defined or hadoop is in your $PATH.'
+    exit 1
+  fi
+else
+  HADOOP_PATH="$HADOOP_HOME/bin/hadoop"
+fi
+
+HADOOP_CLASSPATH=`$HADOOP_PATH classpath`
+result=$?
+if [ "$result" != "0" ]; then
+   echo "Failed to run $HADOOP_PATH classpath. Result=$result."
+   echo 'Please ensure that HADOOP_HOME is defined or hadoop is in your $PATH.'
+   exit 1
+fi
+CLASSPATH=${CLASSPATH}:$HADOOP_CLASSPATH
+
+# add user-specified CLASSPATH last
+if [ "$TEZ_USER_CLASSPATH_FIRST" = "" ] && [ "$TEZ_CLASSPATH" != "" ]; then
+  CLASSPATH=${CLASSPATH}:${TEZ_CLASSPATH}
+fi
+
+# default log directory & file
+if [ "$TEZ_LOG_DIR" = "" ]; then
+  TEZ_LOG_DIR="$TEZ_HOME/logs"
+fi
+
+# restore ordinary behaviour
+unset IFS
+
+echo "$CLASSPATH"

Added: incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh (added)
+++ incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,177 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Runs a Hadoop command as a daemon.
+#
+# Environment Variables
+#
+#   TEZ_CONF_DIR  Alternate conf dir. Default is ${TEZ_HOME}/conf.
+#   TEZ_LOG_DIR   Where log files are stored.  PWD by default.
+#   TEZ_PID_DIR   The pid files are stored. /tmp by default.
+#   TEZ_IDENT_STRING   A string representing this instance of tez. $USER by default
+#   TEZ_NICENESS The scheduling priority for daemons. Defaults to 0.
+##
+
+usage="Usage: tez-daemon.sh [--config <conf-dir>] (start|stop) <tez-command> <args...>"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+  echo $usage
+  exit 1
+fi
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin"; pwd`
+
+DEFAULT_LIBEXEC_DIR="$bin"/../libexec
+TEZ_LIBEXEC_DIR=${TEZ_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
+. $TEZ_LIBEXEC_DIR/tez-config.sh
+
+# get arguments
+
+startStop=$1
+shift
+command=$1
+shift
+
+tez_rotate_log ()
+{
+  log=$1;
+  num=5;
+  if [ -n "$2" ]; then
+  	num=$2
+  fi
+  if [ -f "$log" ]; then # rotate logs
+	  while [ $num -gt 1 ]; do
+	    prev=`expr $num - 1`
+	    [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+	    num=$prev
+	  done
+	  mv "$log" "$log.$num";
+  fi
+}
+
+if [ -f "${TEZ_CONF_DIR}/tez-env.sh" ]; then
+  . "${TEZ_CONF_DIR}/tez-env.sh"
+fi
+
+if [ "$TEZ_IDENT_STRING" = "" ]; then
+  export TEZ_IDENT_STRING="$USER"
+fi
+
+# get log directory
+if [ "$TEZ_LOG_DIR" = "" ]; then
+  export TEZ_LOG_DIR="$TEZ_HOME/logs"
+fi
+
+if [ ! -w "$TEZ_LOG_DIR" ] ; then
+  mkdir -p "$TEZ_LOG_DIR"
+  chown $TEZ_IDENT_STRING $TEZ_LOG_DIR
+fi
+
+if [ "$TEZ_PID_DIR" = "" ]; then
+  TEZ_PID_DIR=/tmp
+fi
+
+# some variables
+export TEZ_LOGFILE=tez-$TEZ_IDENT_STRING-$command-$HOSTNAME.log
+export TEZ_ROOT_LOGGER=${TEZ_ROOT_LOGGER:-"INFO,RFA"}
+log=$TEZ_LOG_DIR/tez-$TEZ_IDENT_STRING-$command-$HOSTNAME.out
+pid=$TEZ_PID_DIR/tez-$TEZ_IDENT_STRING-$command.pid
+TEZ_STOP_TIMEOUT=${TEZ_STOP_TIMEOUT:-5}
+
+# Set default scheduling priority
+if [ "$TEZ_NICENESS" = "" ]; then
+    export TEZ_NICENESS=0
+fi
+
+TEZ_OPTS="$TEZ_OPTS -Dtez.log.dir=$TEZ_LOG_DIR"
+TEZ_OPTS="$TEZ_OPTS -Dtez.log.file=$TEZ_LOGFILE"
+TEZ_OPTS="$TEZ_OPTS -Dtez.home.dir=$TEZ_HOME"
+TEZ_OPTS="$TEZ_OPTS -Dtez.id.str=$TEZ_IDENT_STRING"
+TEZ_OPTS="$TEZ_OPTS -Dtez.root.logger=${TEZ_ROOT_LOGGER:-INFO,console}"
+
+case $startStop in
+
+  (start)
+
+    [ -w "$TEZ_PID_DIR" ] ||  mkdir -p "$TEZ_PID_DIR"
+
+    if [ -f $pid ]; then
+      if kill -0 `cat $pid` > /dev/null 2>&1; then
+        echo $command running as process `cat $pid`.  Stop it first.
+        exit 1
+      fi
+    fi
+
+    tez_rotate_log $log
+    echo starting $command, logging to $log
+    cd "$TEZ_HOME"
+    case $command in
+      ampoolservice)
+        export CLASSPATH=$CLASSPATH
+        nohup nice -n $TEZ_NICENESS "$JAVA" $JAVA_HEAP_MAX $TEZ_OPTS \
+          org.apache.tez.ampool.AMPoolService --cli \
+          > "$log" 2>&1 < /dev/null &
+
+      ;;
+      (*)
+         echo "Unknown command $command"
+         echo "Only command 'ampoolservice' is supported"
+         exit 1
+      ;;
+    esac
+    echo $! > $pid
+    sleep 1
+    echo "ulimit -a for user $USER" >> $log
+    ulimit -a >> $log 2>&1
+    head -30 "$log"
+    sleep 3;
+    if ! ps -p $! > /dev/null ; then
+      exit 1
+    fi
+    ;;
+
+  (stop)
+
+    if [ -f $pid ]; then
+      TARGET_PID=`cat $pid`
+      if kill -0 $TARGET_PID > /dev/null 2>&1; then
+        echo stopping $command
+        kill $TARGET_PID
+        sleep $TEZ_STOP_TIMEOUT
+        if kill -0 $TARGET_PID > /dev/null 2>&1; then
+          echo "$command did not stop gracefully after $TEZ_STOP_TIMEOUT seconds: killing with kill -9"
+          kill -9 $TARGET_PID
+        fi
+      else
+        echo no $command to stop
+      fi
+    else
+      echo no $command to stop
+    fi
+    ;;
+
+  (*)
+    echo $usage
+    exit 1
+    ;;
+
+esac
+
+

Propchange: incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml (added)
+++ incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+
+  <!-- Uncomment out and set values as needed.
+       Current file is a duplicate of the default settings -->
+
+  <!--
+  <property>
+     <description>How often to poll AMPoolService for job assignment</description>
+     <name>yarn.app.mapreduce.am.lazy.polling-interval.secs</name>
+     <value>1</value>
+  </property>
+
+  <property>
+     <description>How many containers to pre-allocate after starting up</description>
+     <name>yarn.app.mapreduce.am.lazy.prealloc-container-count</name>
+     <value>0</value>
+  </property>
+  -->
+
+</configuration>

Added: incubator/tez/tez-ampool/src/main/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/log4j.properties?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/log4j.properties (added)
+++ incubator/tez/tez-ampool/src/main/conf/log4j.properties Fri Mar 15 21:26:36 2013
@@ -0,0 +1,80 @@
+# Copyright 2011 The Apache Software Foundation
+# 
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+tez.root.logger=INFO,console
+tez.log.dir=.
+tez.log.file=tez.log
+
+# Define the root logger to the system property "tez.root.logger".
+log4j.rootLogger=${tez.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+# Null Appender
+log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
+
+#
+# Rolling File Appender - cap space usage at 5gb.
+#
+tez.log.maxfilesize=256MB
+tez.log.maxbackupindex=10
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.File=${tez.log.dir}/${tez.log.file}
+
+log4j.appender.RFA.MaxFileSize=${tez.log.maxfilesize}
+log4j.appender.RFA.MaxBackupIndex=${tez.log.maxbackupindex}
+
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${tez.log.dir}/${tez.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this 
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n

Added: incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml (added)
+++ incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,119 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+<configuration>
+
+  <!-- Uncomment out and set values as needed.
+       Current file is a duplicate of the default settings -->
+
+  <!--
+  <property>
+    <description>Memory to use for AMPoolService AM</description>
+    <name>tez.ampool.am.master_memory</name>
+    <value>1024</value>
+  </property>
+
+  <property>
+    <description>Queue to launch AMPoolService AM</description>
+    <name>tez.ampool.am.master_queue</name>
+    <value>default</value>
+  </property>
+
+  <property>
+    <description>Port to use for AMPoolService status</description>
+    <name>tez.ampool.ws.port</name>
+    <value>12999</value>
+  </property>
+
+  <property>
+    <description>Minimum size of AM Pool</description>
+    <name>tez.ampool.am-pool-size</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <description>Maximum size of AM Pool</description>
+    <name>tez.ampool.max-am-pool-size</name>
+    <value>5</value>
+  </property>
+
+  <property>
+    <description>When to launch new AM. If true, launched after an existing AM in pool completes. Else, launched as soon as a job is assigned to an AM from the pool</description>
+    <name>tez.ampool.launch-new-am-after-app-completion</name>
+    <value>true</value>
+  </property>
+
+  <property>
+    <description>How many launch failures to account for unassigned AMs before shutting down AMPoolService</description>
+    <name>tez.ampool.max-am-launch-failures</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>No. of threads to use to serve the ClientRMProtocol proxy fronted by the AMPoolService</description>
+    <name>tez.ampool.rm-proxy-client.thread-count</name>
+    <value>10</value>
+  </property>
+
+  <property>
+    <description>Address on which to run the ClientRMProtocol proxy</description>
+    <name>tez.ampool.address</name>
+    <value>0.0.0.0:10030</value>
+  </property>
+
+  <property>
+    <description>Memory to use when launching the lazy MR AM</description>
+    <name>tez.ampool.mr-am.memory-allocation-mb</name>
+    <value>1536</value>
+  </property>
+
+  <property>
+    <description>Queue to which the Lazy MRAM is to be submitted to</description>
+    <name>tez.ampool.mr-am.queue-name</name>
+    <value>default</value>
+  </property>
+
+  <property>
+    <description>Comma-separated paths to job jars on DFS (optional)</description>
+    <name>tez.ampool.mr-am.job-jar-path</name>
+    <value></value>
+  </property>
+
+  <property>
+    <description>Main class for Lazy MRAM</description>
+    <name>tez.ampool.mr-am.application-master-class</name>
+    <value>org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster</value>
+  </property>
+
+  <property>
+    <description>Local filesystem path for staging local data used by AMPoolClient/AMPoolService</description>
+    <name>tez.ampool.tmp-dir-path</name>
+    <value>/tmp/ampoolservice/</value>
+  </property>
+
+  <property>
+    <description>Path on FS used by AMPoolService to upload lazy-mr-am config</description>
+    <name>tez.ampool.am.staging-dir</name>
+    <value>/tmp/tez/ampool/staging/</value>
+  </property>
+  -->
+
+</configuration>
+

Added: incubator/tez/tez-ampool/src/main/conf/tez-env.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/tez-env.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/tez-env.sh (added)
+++ incubator/tez/tez-ampool/src/main/conf/tez-env.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set Tez-specific environment variables here.
+
+# The only required environment variables are JAVA_HOME and HADOOP_HOME.  All others are
+# optional.
+
+# The java implementation to use.
+export JAVA_HOME=${JAVA_HOME}
+
+# Set HADOOP_HOME as needed by Tez
+#export HADOOP_HOME=
+
+# The maximum amount of heap to use, in MB. Default is 1024.
+#export TEZ_HEAPSIZE=
+
+# Where log files are stored.  $TEZ_HOME/logs by default.
+#export TEZ_LOG_DIR=${TEZ_LOG_DIR}/$USER
+
+# The directory where pid files are stored. /tmp by default.
+# NOTE: this should be set to a directory that can only be written to by
+#       the user that will run the tez daemons.  Otherwise there is the
+#       potential for a symlink attack.
+export TEZ_PID_DIR=${TEZ_PID_DIR}
+
+# A string representing this instance of tez. $USER by default.
+export TEZ_IDENT_STRING=$USER

Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class LazyAMConfig extends Configuration {
+
+  private static final String LAZY_AM_DEFAULT_XML_FILE = "lazy-mram-default.xml";
+  private static final String LAZY_AM_SITE_XML_FILE = "lazy-mram-site.xml";
+  public static final String LAZY_AM_JOB_XML_FILE = "lazy-mram-job.xml";
+
+  static {
+    Configuration.addDefaultResource(LAZY_AM_DEFAULT_XML_FILE);
+    Configuration.addDefaultResource(LAZY_AM_SITE_XML_FILE);
+    Configuration.addDefaultResource(LAZY_AM_JOB_XML_FILE);
+  }
+
+  public static final String LAZY_MR_AM_PREFIX =
+      MRJobConfig.MR_AM_PREFIX + "lazy.";
+
+  public static final String POLLING_INTERVAL_SECONDS =
+      LAZY_MR_AM_PREFIX + "polling-interval.secs";
+  public static final int DEFAULT_POLLING_INTERVAL_SECONDS = 1;
+
+  public static final String PREALLOC_CONTAINER_COUNT =
+      LAZY_MR_AM_PREFIX + "prealloc-container-count";
+  public static final int DEFAULT_PREALLOC_CONTAINER_COUNT = 0;
+
+  public LazyAMConfig() {
+    super();
+  }
+
+  public LazyAMConfig(Configuration conf) {
+    super(conf);
+    if (! (conf instanceof LazyAMConfig)) {
+      this.reloadConfiguration();
+    }
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,620 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.rest.ApplicationPollResponse;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class LazyMRAppMaster extends MRAppMaster {
+
+  private static final Log LOG = LogFactory.getLog(LazyMRAppMaster.class);
+
+  private volatile boolean stopped;
+  private volatile boolean jobInitialized;
+  private volatile boolean jobStarted;
+
+  protected final String jobPollingUrl;
+  private int jobPollingIntervalSeconds;
+  private NotRunningJob notRunningJob;
+
+  private List<Service> services;
+
+  public LazyMRAppMaster(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+      long appSubmitTime, String pollingUrl) {
+    super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+        appSubmitTime);
+    this.jobPollingUrl = pollingUrl;
+    this.stopped = false;
+    this.jobInitialized = false;
+    this.jobStarted = false;
+    this.services = new ArrayList<Service>();
+  }
+
+  private void initAfterJobSubmission(Configuration jobConf) {
+
+    downloadTokensAndSetupUGI(jobConf);
+
+    // Job name is the same as the app name util we support DAG of jobs
+    // for an app later
+    appName = jobConf.get(MRJobConfig.JOB_NAME, "<missing app name>");
+
+
+    newApiCommitter = false;
+    jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+        appAttemptID.getApplicationId().getId());
+    int numReduceTasks = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+    if ((numReduceTasks > 0 &&
+        jobConf.getBoolean("mapred.reducer.new-api", false)) ||
+          (numReduceTasks == 0 &&
+           jobConf.getBoolean("mapred.mapper.new-api", false)))  {
+      newApiCommitter = true;
+      LOG.info("Using mapred newApiCommitter.");
+    }
+
+    committer = createOutputCommitter(jobConf);
+    boolean recoveryEnabled = jobConf.getBoolean(
+        MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+
+    // TODO fix - recovery will currently never happen with lazy start
+    // need to follow a different startup route for non-first attempts
+    // for recovery to work with the recovery dispatcher
+    LOG.info("Not starting RecoveryService: recoveryEnabled: "
+        + recoveryEnabled + " recoverySupportedByCommitter: "
+        + recoverySupportedByCommitter + " ApplicationAttemptID: "
+        + appAttemptID.getAttemptId());
+
+
+    if (jobConf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+        || jobConf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+      //optional service to speculate on task attempts' progress
+      speculator = createSpeculator(jobConf, context);
+      addService(speculator);
+    }
+
+    speculatorEventDispatcher = new SpeculatorEventDispatcher(jobConf);
+    dispatcher.register(Speculator.EventType.class,
+        speculatorEventDispatcher);
+
+    //service to log job history events
+    jobHistoryEventHandler = createJobHistoryHandler(context);
+    dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+        jobHistoryEventHandler);
+
+    // Add the JobHistoryEventHandler last so that it is properly stopped first.
+    // This will guarantee that all history-events are flushed before AM goes
+    // ahead with shutdown.
+    // Note: Even though JobHistoryEventHandler is started last, if any
+    // component creates a JobHistoryEvent in the meanwhile, it will be just be
+    // queued inside the JobHistoryEventHandler
+    addService(this.jobHistoryEventHandler);
+
+    // init newly added services
+    initServices();
+  }
+
+  @Override
+  protected ContainerRequestor createContainerRequestor(
+      ClientService clientService, AppContext context) {
+    return new LazyRMContainerRequestor(clientService, context);
+  }
+
+  @Override
+  protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+      AppContext appContext) {
+    return new LazyRMContainerAllocator(requestor, context);
+  }
+
+
+  @Override
+  public void init(Configuration conf) {
+    this.jobPollingIntervalSeconds =
+        conf.getInt(LazyAMConfig.POLLING_INTERVAL_SECONDS,
+            LazyAMConfig.DEFAULT_POLLING_INTERVAL_SECONDS);
+
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        appAttemptID.getAttemptId());
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+    this.conf = conf;
+    super.startTime = -1;
+    super.appSubmitTime = -1;
+
+    context = new LazyStartAppContext(this, conf);
+    notRunningJob = new NotRunningJob(super.appAttemptID,
+        context, conf);
+
+    // add services that can be started without a job
+
+    dispatcher = createDispatcher();
+    addService(dispatcher);
+
+    // service to handle requests from JobClient, start webapp handler
+    clientService = new LazyMRClientService(context);
+    addService(clientService);
+
+    taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
+    addService(taskHeartbeatHandler);
+
+    containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
+    addService(containerHeartbeatHandler);
+
+    //service to handle requests to TaskUmbilicalProtocol
+    taskAttemptListener = createTaskAttemptListener(context,
+        taskHeartbeatHandler, containerHeartbeatHandler);
+    addService(taskAttemptListener);
+
+    containers = new AMContainerMap(containerHeartbeatHandler,
+        taskAttemptListener, context);
+    addService(containers);
+    dispatcher.register(AMContainerEventType.class, containers);
+
+    nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
+    addService(nodes);
+    dispatcher.register(AMNodeEventType.class, nodes);
+
+    //service to do the task cleanup
+    taskCleaner = createTaskCleaner(context);
+    addService(taskCleaner);
+
+    this.jobEventDispatcher = new JobEventDispatcher();
+
+    //register the event dispatchers
+    dispatcher.register(JobEventType.class, jobEventDispatcher);
+    dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+    dispatcher.register(TaskAttemptEventType.class,
+        new TaskAttemptEventDispatcher());
+    dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+
+    //    TODO XXX: Rename to NMComm
+    //    corresponding service to launch allocated containers via NodeManager
+    //    containerLauncher = createNMCommunicator(context);
+    containerLauncher = createContainerLauncher(context);
+    addService(containerLauncher);
+    dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+
+    // service to allocate containers from RM (if non-uber) or to fake it (uber)
+    containerRequestor = createContainerRequestor(clientService, context);
+    addService(containerRequestor);
+    dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+    amScheduler = createAMScheduler(containerRequestor, context);
+    addService(amScheduler);
+    dispatcher.register(AMSchedulerEventType.class, amScheduler);
+
+    // Add the staging directory cleaner before the history server but after
+    // the container allocator so the staging directory is cleaned after
+    // the history has been flushed but before unregistering with the RM.
+    this.stagingDirCleanerService = createStagingDirCleaningService();
+    addService(stagingDirCleanerService);
+
+    initServices();
+  }
+
+  private void addService(Object o) {
+    if ( o instanceof Service) {
+      services.add((Service) o);
+    }
+  }
+
+  private void initServices() {
+    for (Service s : services) {
+      if (s.getServiceState().equals(STATE.NOTINITED)) {
+        s.init(conf);
+      }
+    }
+  }
+
+  private void startServices() {
+    for (Service s : services) {
+      if (s.getServiceState().equals(STATE.INITED)) {
+        s.start();
+      }
+    }
+  }
+
+  private void stopServices() {
+    // stop in reverse order
+    for (int i = services.size() - 1; i >= 0; --i) {
+      Service s = services.get(i);
+      if (s.getServiceState().equals(STATE.STARTED)) {
+        s.stop();
+      }
+    }
+  }
+
+  @Override
+  public void start() {
+    startServices();
+    if (!jobInitialized) {
+      Thread poller = new Thread(
+          new JobPollerRunnable(jobPollingUrl, jobPollingIntervalSeconds));
+      poller.setName("JobPollerThread");
+      poller.run();
+    } else {
+      LOG.info("Job submitted to LazyMRAM. Starting job");
+      super.startAM(conf);
+      this.jobStarted = true;
+      super.startJobs();
+    }
+  }
+
+  public void lazyStart(String jobConfFileLocation,
+      String jobJarLocation,
+      long jobSubmissionTime) {
+    super.startTime = super.clock.getTime();
+    super.appSubmitTime = jobSubmissionTime;
+    LOG.info("Launching job using"
+        + ", jobConfLocation=" + jobConfFileLocation
+        + ", jobJarLocation=" + jobJarLocation
+        + ", jobSubmissionTime=" + jobSubmissionTime);
+
+    FileSystem fs;
+    Path confPath = new Path(jobConfFileLocation);
+
+    Configuration mergeConf = new Configuration(conf);
+    try {
+      fs = FileSystem.get(conf);
+      FileStatus fStatus = fs.getFileStatus(confPath);
+      if (!fStatus.isFile()
+          || fStatus.getLen() <= 0) {
+        LOG.error("Invalid conf file provided"
+            + ", jobConfLocation=" + jobConfFileLocation);
+        System.exit(-1);
+      } else {
+        LOG.info("Job Conf File Status"
+          + fStatus.toString());
+        mergeConf.addResource(fs.open(confPath));
+      }
+    } catch (IOException e) {
+      LOG.error("Invalid conf file provided"
+          + ", jobConfLocation=" + jobConfFileLocation, e);
+      System.exit(-1);
+    }
+    if (jobJarLocation != null
+        && !jobJarLocation.isEmpty()) {
+      mergeConf.set(MRJobConfig.JOB_JAR,
+          jobJarLocation);
+    }
+
+    // Explicitly disable uber jobs as AM cannot be relocalized
+    mergeConf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+    // for now disable recovery
+    // TODO fix
+    mergeConf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
+
+    this.conf = new JobConf(mergeConf);
+    ((LazyStartAppContext)context).setConfiguration(conf);
+
+    // Init and start main AM
+    LOG.info("Initializing main MR AM");
+    initAfterJobSubmission(conf);
+    jobInitialized = true;
+    LOG.info("Starting main MR AM");
+    start();
+    LOG.info("Started main MR AM");
+  }
+
+  @Override
+  public void stop() {
+    stopServices();
+  }
+
+  public static void main(String[] args) {
+    try {
+      Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+      DeprecatedKeys.init();
+      String containerIdStr =
+          System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+      String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+      String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+      String nodeHttpPortString =
+          System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+      String appSubmitTimeStr =
+          System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      String pollingUrl =
+          System.getenv(AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV);
+
+      MRAppMaster.validateInputParam(containerIdStr,
+          ApplicationConstants.AM_CONTAINER_ID_ENV);
+      MRAppMaster.validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+      MRAppMaster.validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+      MRAppMaster.validateInputParam(nodeHttpPortString,
+          ApplicationConstants.NM_HTTP_PORT_ENV);
+      MRAppMaster.validateInputParam(appSubmitTimeStr,
+          ApplicationConstants.APP_SUBMIT_TIME_ENV);
+      MRAppMaster.validateInputParam(pollingUrl,
+          AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV);
+
+      ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+      ApplicationAttemptId applicationAttemptId =
+          containerId.getApplicationAttemptId();
+      long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+      pollingUrl += applicationAttemptId.toString();
+      LOG.info("Polling url for lazy init of job is "
+          + pollingUrl);
+
+      LazyMRAppMaster appMaster =
+          new LazyMRAppMaster(applicationAttemptId, containerId, nodeHostString,
+              Integer.parseInt(nodePortString),
+              Integer.parseInt(nodeHttpPortString), appSubmitTime,
+              pollingUrl);
+      ShutdownHookManager.get().addShutdownHook(
+        new LazyMRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration(new LazyAMConfig());
+      String jobUserName = System
+          .getenv(ApplicationConstants.Environment.USER.name());
+      conf.set(MRJobConfig.USER_NAME, jobUserName);
+      // Do not automatically close FileSystem objects so that in case of
+      // SIGTERM I have a chance to write out the job history. I'll be closing
+      // the objects myself.
+      conf.setBoolean("fs.automatic.close", false);
+      initAndStartAppMaster(appMaster, conf, jobUserName);
+    } catch (Throwable t) {
+      LOG.fatal("Error starting MRAppMaster", t);
+      System.exit(1);
+    }
+  }
+
+  protected static void initAndStartAppMaster(final LazyMRAppMaster appMaster,
+      final YarnConfiguration conf, String jobUserName) throws IOException,
+      InterruptedException {
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation appMasterUgi = UserGroupInformation
+        .createRemoteUser(jobUserName);
+
+    appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+      @Override
+      public Object run() throws Exception {
+        appMaster.init(conf);
+        appMaster.start();
+        return null;
+      }
+    });
+  }
+
+  private class JobPollerRunnable implements Runnable {
+
+    private final Log LOG = LogFactory.getLog(JobPollerRunnable.class);
+    private final HttpClient httpClient;
+    private final GetMethod getMethod;
+    private final int pollingIntervalSeconds;
+
+    public JobPollerRunnable(String pollingUrl,
+        int pollingIntervalSeconds) {
+      this.httpClient = new HttpClient();
+      this.getMethod = new GetMethod(pollingUrl);
+      this.getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
+          new DefaultHttpMethodRetryHandler(1, false));
+      this.pollingIntervalSeconds = pollingIntervalSeconds;
+    }
+
+    @Override
+    public void run() {
+      while (!stopped) {
+        boolean exceptionThrown = false;
+        try {
+          LOG.debug("Polling for job"
+              + ", pollingUrl=" + jobPollingUrl);
+          int responseCode = httpClient.executeMethod(getMethod);
+          if (responseCode == HttpStatus.SC_OK) {
+            String responseBody = getMethod.getResponseBodyAsString();
+            ObjectMapper mapper = new ObjectMapper();
+            ApplicationPollResponse response =
+                mapper.readValue(responseBody,
+                    ApplicationPollResponse.class);
+            LOG.info("Received poll response"
+                + ", pollingUrl=" + jobPollingUrl
+                + ", pollResonse=" + response);
+            lazyStart(response.getConfigurationFileLocation(),
+                response.getApplicationJarLocation(),
+                response.getApplicationSubmissionTime());
+            break;
+          }
+        } catch (HttpException e) {
+          LOG.error(e);
+          exceptionThrown = true;
+        } catch (IOException e) {
+          LOG.error(e);
+          exceptionThrown = true;
+        }
+        if (exceptionThrown) {
+          LOG.info("Could not connect to polling url. Shutting down");
+          System.exit(-1);
+        }
+        try {
+          Thread.sleep(pollingIntervalSeconds*1000);
+        } catch (InterruptedException e) {
+        }
+      }
+      getMethod.releaseConnection();
+    }
+  }
+
+
+  // The shutdown hook that runs when a signal is received AND during normal
+  // close of the JVM.
+  static class LazyMRAppMasterShutdownHook implements Runnable {
+    LazyMRAppMaster appMaster;
+    LazyMRAppMasterShutdownHook(LazyMRAppMaster appMaster) {
+      this.appMaster = appMaster;
+    }
+    public void run() {
+      // TODO
+      appMaster.stop();
+    }
+  }
+
+  public class LazyStartAppContext extends RunningAppContext {
+
+    public LazyStartAppContext(LazyMRAppMaster appMaster,
+        Configuration config) {
+      super(config);
+    }
+
+    @Override
+    public ApplicationAttemptId getApplicationAttemptId() {
+      return appAttemptID;
+    }
+
+    @Override
+    public ApplicationId getApplicationID() {
+      return appAttemptID.getApplicationId();
+    }
+
+    @Override
+    public String getApplicationName() {
+      if (jobInitialized) {
+        return super.getApplicationName();
+      }
+      return "Job not initialized";
+    }
+
+    public long getSubmitTime() {
+      return appSubmitTime;
+    }
+
+    @Override
+    public long getStartTime() {
+      return super.getStartTime();
+    }
+
+    @Override
+    public Job getJob(JobId jobID) {
+      if (!jobStarted && super.getAllJobs().isEmpty()) {
+        LOG.info("Getting polled when job is not initialized yet."
+            + " Sending back dummy not running job");
+        return notRunningJob;
+      }
+      return super.getJob(jobID);
+    }
+
+    @Override
+    public Map<JobId, Job> getAllJobs() {
+      return super.getAllJobs();
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public EventHandler getEventHandler() {
+      return super.getEventHandler();
+    }
+
+    @Override
+    public CharSequence getUser() {
+      if (jobInitialized) {
+        return conf.get(MRJobConfig.USER_NAME);
+      }
+      return "";
+    }
+
+    @Override
+    public Clock getClock() {
+      return super.getClock();
+    }
+
+    @Override
+    public ClusterInfo getClusterInfo() {
+      return super.getClusterInfo();
+    }
+
+    @Override
+    public AMContainerMap getAllContainers() {
+      return super.getAllContainers();
+    }
+
+    @Override
+    public AMNodeMap getAllNodes() {
+      return super.getAllNodes();
+    }
+
+    @Override
+    public Map<ApplicationAccessType, String> getApplicationACLs() {
+      if (!jobStarted) {
+        throw new YarnException(
+            "Cannot get ApplicationACLs before all services have started");
+      }
+      return containerRequestor.getApplicationACLs();
+    }
+
+  }
+
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster.LazyStartAppContext;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+
+public class LazyMRClientService extends MRClientService {
+
+  public LazyMRClientService(AppContext appContext) {
+    super(LazyMRClientService.class.getName(),
+        appContext);
+    super.setProtocolHandler(new LazyMRClientProtocolHandler(appContext));
+  }
+
+  class LazyMRClientProtocolHandler extends MRClientProtocolHandler {
+
+    private LazyStartAppContext lazyStartAppContext;
+    
+    public LazyMRClientProtocolHandler(AppContext appContext) {
+      super();
+      if (appContext instanceof LazyStartAppContext) {
+        this.lazyStartAppContext = (LazyStartAppContext) appContext;
+      } else {
+        this.lazyStartAppContext = null;
+      }
+    }
+    
+    @Override
+    public GetJobReportResponse getJobReport(GetJobReportRequest request) 
+      throws YarnRemoteException {
+      if (lazyStartAppContext != null
+        && lazyStartAppContext.getJob(request.getJobId()) == null) {
+        // create dummy job report as client will start polling before AM is 
+        // fully up
+        List<AMInfo> amInfos = new ArrayList<AMInfo>();
+        JobReport jobReport = MRBuilderUtils.newJobReport(request.getJobId(),
+            "", "", JobState.NEW,
+            -1, -1, -1, 0, 0, 0, 0, "", amInfos , false, "");
+        GetJobReportResponse response =
+            Records.newRecord(GetJobReportResponse.class);
+        response.setJobReport(jobReport);
+        return response;
+      }
+      return super.getJobReport(request);
+    }
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+
+public class LazyRMContainerAllocator extends RMContainerAllocator {
+
+  private static final Log LOG = LogFactory.getLog(
+      LazyRMContainerAllocator.class);
+
+  public LazyRMContainerAllocator(ContainerRequestor requestor,
+      AppContext appContext) {
+    super(requestor, appContext);
+  }
+
+  @Override
+  protected synchronized void assignContainers() {
+    Job j = getJob();
+    if (j == null || j instanceof NotRunningJob) {
+      LOG.debug("Nothing to do in assignContainers as no job submitted");
+      return;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Job submitted. Assigning containers");
+    }
+    super.assignContainers();
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class LazyRMContainerRequestor extends RMContainerRequestor {
+
+  private int preAllocContainerCount;
+  private int preAllocContainerMemoryMB;
+  private Configuration conf;
+
+  private static final Log LOG =
+      LogFactory.getLog(LazyRMContainerRequestor.class);
+
+  public LazyRMContainerRequestor(ClientService clientService,
+      AppContext context) {
+    super(clientService, context);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+    preAllocContainerCount = conf.getInt(
+        LazyAMConfig.PREALLOC_CONTAINER_COUNT,
+        LazyAMConfig.DEFAULT_PREALLOC_CONTAINER_COUNT);
+    preAllocContainerMemoryMB =
+        conf.getInt(MRJobConfig.MAP_MEMORY_MB,
+            MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+
+    LOG.info("Prealloc container count for LazyAM is "
+        + preAllocContainerCount
+        + ", memoryMB=" + preAllocContainerMemoryMB);
+
+    if (preAllocContainerCount < 0
+        || preAllocContainerMemoryMB < 0) {
+      throw new IllegalArgumentException("Invalid config values specified"
+          + ", preAllocationContainerCount=" + preAllocContainerCount
+          + ", mapMemoryMB=" + preAllocContainerMemoryMB);
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    super.start();
+    if (preAllocContainerCount <= 0) {
+      return;
+    }
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(preAllocContainerMemoryMB);
+    capability.setVirtualCores(1);
+
+    String[] emptyArray = new String[0];
+    for (int i = 0; i < preAllocContainerCount; ++i) {
+      ContainerRequest cReq = new ContainerRequest(capability, emptyArray,
+          emptyArray, RMContainerAllocator.PRIORITY_MAP);
+      super.addContainerReq(cReq);
+    }
+  }
+
+  @Override
+  public synchronized Configuration getConfig() {
+    return this.conf;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+public class AMContext {
+
+  private final ApplicationId applicationId;
+  private ApplicationSubmissionContext submissionContext;
+  private ApplicationAttemptId currentApplicationAttemptId;
+  private long applicationSubmissionTime;
+  private long jobPickUpTime = -1;
+
+  public AMContext(ApplicationId applicationId) {
+    this.applicationId = applicationId;
+    this.submissionContext = null;
+    this.currentApplicationAttemptId = null;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public synchronized ApplicationSubmissionContext getSubmissionContext() {
+    return submissionContext;
+  }
+
+  public synchronized void setSubmissionContext(ApplicationSubmissionContext submissionContext) {
+    this.submissionContext = submissionContext;
+  }
+
+  public synchronized ApplicationAttemptId getCurrentApplicationAttemptId() {
+    return currentApplicationAttemptId;
+  }
+
+  public synchronized void setCurrentApplicationAttemptId(
+      ApplicationAttemptId currentApplicationAttemptId) {
+    this.currentApplicationAttemptId = currentApplicationAttemptId;
+  }
+
+  public synchronized long getApplicationSubmissionTime() {
+    return applicationSubmissionTime;
+  }
+
+  public synchronized void setApplicationSubmissionTime(long applicationSubmissionTime) {
+    this.applicationSubmissionTime = applicationSubmissionTime;
+  }
+
+  public synchronized long getJobPickUpTime() {
+    return jobPickUpTime;
+  }
+
+  public synchronized void setJobPickUpTime(long jobPickUpTime) {
+    this.jobPickUpTime = jobPickUpTime;
+  }
+
+  @Override
+  public String toString() {
+    return "applicationId=" + applicationId
+        + ", applicationSubmissionContextIsNull="
+        + (getSubmissionContext() == null)
+        + ", jobPickupTime=" + jobPickUpTime
+        + ", applicationSubmissionTime=" + getApplicationSubmissionTime()
+        + ", applicationAttemptIdIsNull=" +
+        (getCurrentApplicationAttemptId() == null);
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+public interface AMLauncher {
+
+  public void launchAM();
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMMonitorEvent extends AbstractEvent<AMMonitorEventType> {
+
+  private final ApplicationId applicationId;
+
+  public AMMonitorEvent(AMMonitorEventType type,
+      ApplicationId applicationId) {
+    super(type);
+    this.applicationId = applicationId;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+public enum AMMonitorEventType {
+  AM_MONITOR_START,
+  AM_MONITOR_STOP
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.ampool.manager.AMFinishedEvent;
+
+public class AMMonitorService extends AbstractService
+  implements EventHandler<AMMonitorEvent> {
+
+  private static final Log LOG = LogFactory.getLog(AMMonitorService.class);
+
+  private final AMPoolContext context;
+  private ScheduledExecutorService scheduler = null;
+  Map<ApplicationId, ScheduledFuture<?>> scheduledFutures;
+  private long heartbeatInterval = 1000l;
+
+  public AMMonitorService(AMPoolContext context) {
+    super(AMMonitorService.class.getName());
+    // TODO Auto-generated constructor stub
+    this.context = context;
+    this.scheduledFutures = new HashMap<ApplicationId, ScheduledFuture<?>>();
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    scheduler = Executors.newScheduledThreadPool(
+        conf.getInt(AMPoolConfiguration.MAX_AM_POOL_SIZE,
+            AMPoolConfiguration.DEFAULT_MAX_AM_POOL_SIZE));
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Starting AMMonitorService");
+    super.start();
+  }
+
+  @Override
+  public synchronized void stop() {
+    LOG.info("Stopping AMMonitorService");
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+    }
+  }
+
+  private synchronized void addAppToMonitor(ApplicationId applicationId) {
+    LOG.info("Starting to monitor application"
+        + ", applicationId=" + applicationId);
+    AMMonitorTask amMonitorTask =
+        new AMMonitorTask(applicationId);
+    ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
+        amMonitorTask, heartbeatInterval, heartbeatInterval,
+        TimeUnit.MILLISECONDS);
+    scheduledFutures.put(applicationId, future);
+  }
+
+  private synchronized void removeAppFromMonitor(ApplicationId applicationId) {
+    LOG.info("Stopping to monitor application"
+        + ", applicationId=" + applicationId);
+    if (!scheduledFutures.containsKey(applicationId)) {
+      return;
+    }
+    scheduledFutures.get(applicationId).cancel(true);
+    scheduledFutures.remove(applicationId);
+  }
+
+  private class AMMonitorTask implements Runnable {
+    private final ApplicationId applicationId;
+
+    public AMMonitorTask(ApplicationId applicationId) {
+      this.applicationId = applicationId;
+    }
+
+    @SuppressWarnings("unchecked")
+    public void run() {
+      try {
+        LOG.info("Trying to monitor status for application"
+            + ", applicationId=" + applicationId);
+        ApplicationReport report =
+            context.getRMYarnClient().getApplicationReport(applicationId);
+        if (report == null) {
+          LOG.warn("Received null report from RM for"
+              + " applicationId=" + applicationId);
+          return;
+        }
+        LOG.info("Monitoring status for application"
+            + ", applicationId=" + applicationId
+            + ", yarnApplicationState=" + report.getYarnApplicationState()
+            + ", finalApplicationStatus="
+            + report.getFinalApplicationStatus());
+
+        // TODO needs a more cleaner fix
+        if (report.getYarnApplicationState() != null) {
+          if (report.getYarnApplicationState()
+                == YarnApplicationState.FAILED
+              || report.getYarnApplicationState()
+                == YarnApplicationState.KILLED
+              || report.getYarnApplicationState()
+                == YarnApplicationState.FINISHED) {
+            if (report.getFinalApplicationStatus() ==
+                FinalApplicationStatus.UNDEFINED) {
+              report.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
+            }
+          }
+        }
+        if (report.getFinalApplicationStatus() ==
+            FinalApplicationStatus.FAILED
+            || report.getFinalApplicationStatus() ==
+            FinalApplicationStatus.SUCCEEDED
+            || report.getFinalApplicationStatus() ==
+            FinalApplicationStatus.KILLED) {
+          AMFinishedEvent event = new AMFinishedEvent(applicationId,
+              report.getFinalApplicationStatus());
+          context.getDispatcher().getEventHandler().handle(event);
+          AMMonitorEvent mEvent = new AMMonitorEvent(
+              AMMonitorEventType.AM_MONITOR_STOP, applicationId);
+          context.getDispatcher().getEventHandler().handle(mEvent);
+        }
+      } catch (YarnRemoteException e) {
+        // TODO Auto-generated catch block
+        LOG.error("Failed to acquire monitoring status for application"
+            + ", applicationId=" + applicationId);
+        e.printStackTrace();
+      }
+    }
+
+  }
+
+  @Override
+  public synchronized void handle(AMMonitorEvent event) {
+    if (event.getType() == AMMonitorEventType.AM_MONITOR_START) {
+      addAppToMonitor(event.getApplicationId());
+    } else if (event.getType() == AMMonitorEventType.AM_MONITOR_STOP) {
+      removeAppFromMonitor(event.getApplicationId());
+    }
+  }
+
+}

Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+
+public class AMPoolClientRMProxy extends AbstractService
+    implements ClientRMProtocol {
+
+  private static final Log LOG = LogFactory.getLog(
+      AMPoolClientRMProxy.class);
+
+  private final AMPoolContext context;
+  private Server server;
+  InetSocketAddress clientBindAddress;
+
+  public AMPoolClientRMProxy(AMPoolContext context) {
+    super(AMPoolClientRMProxy.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    clientBindAddress = getBindAddress(conf);
+
+    InetSocketAddress rmAddress =
+        conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_ADDRESS,
+            YarnConfiguration.DEFAULT_RM_PORT);
+
+    if (clientBindAddress.equals(rmAddress)) {
+      throw new RuntimeException(
+          "AMPoolService's RM proxy is running on the same address"
+          + " as the main RM"
+          + ", AMPoolServiceBindAddress=" + clientBindAddress.toString()
+          + ", RMBindAddress=" + rmAddress.toString());
+    }
+
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    LOG.info("Starting AMPoolClientRMProxy service");
+    Configuration conf = getConfig();
+    YarnRPC rpc = YarnRPC.create(conf);
+    LOG.info("Starting ClientRMProtocol listener at"
+        + clientBindAddress.toString());
+    this.server =
+      rpc.getServer(ClientRMProtocol.class, this,
+            clientBindAddress,
+            conf, null,
+            conf.getInt(AMPoolConfiguration.RM_PROXY_CLIENT_THREAD_COUNT,
+                AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_THREAD_COUNT));
+
+    this.server.start();
+    clientBindAddress = conf.updateConnectAddr(
+        AMPoolConfiguration.RM_PROXY_CLIENT_ADDRESS,
+        server.getListenerAddress());
+    LOG.info("Started ClientRMProtocol listener at"
+        + clientBindAddress.toString());
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    LOG.info("Stopping AMPoolClientRMProxy service");
+    if (this.server != null) {
+        this.server.stop();
+    }
+    super.stop();
+  }
+
+  InetSocketAddress getBindAddress(Configuration conf) {
+    return conf.getSocketAddr(AMPoolConfiguration.RM_PROXY_CLIENT_ADDRESS,
+            AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_ADDRESS,
+            AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_PORT);
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnRemoteException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnRemoteException {
+    ApplicationId appId = context.getNewApplicationId();
+    if (appId != null) {
+      LOG.info("Received a getNewApplication request"
+          + ", assignedAppId=" + appId.toString());
+      GetNewApplicationResponse response = Records.newRecord(
+          GetNewApplicationResponse.class);
+      response.setApplicationId(appId);
+      response.setMaximumResourceCapability(context.getMaxResourceCapability());
+      response.setMinimumResourceCapability(context.getMinResourceCapability());
+      return response;
+    }
+    LOG.info("Received a getNewApplication request, proxying to real RM");
+    return context.getRMYarnClient().getNewApplication();
+  }
+
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnRemoteException {
+    // TODO handle kills for managed AMs
+    context.getRMYarnClient().killApplication(
+        request.getApplicationId());
+    return Records.newRecord(KillApplicationResponse.class);
+  }
+
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnRemoteException {
+    GetApplicationReportResponse response = Records.newRecord(
+        GetApplicationReportResponse.class);
+    ApplicationId applicationId = request.getApplicationId();
+    ApplicationReport applicationReport =
+        context.getRMYarnClient().getApplicationReport(applicationId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received an application report request"
+          + ", applicationId=" + applicationId);
+    }
+
+    // Handle submitted jobs that have not been picked up by AM
+    long jobPickUpTime = -1;
+    if ((applicationReport != null
+        && applicationReport.getYarnApplicationState() != null
+        && applicationReport.getYarnApplicationState()
+            == YarnApplicationState.RUNNING)
+        && context.isManagedApp(applicationId)) {
+      AMContext amContext = context.getAMContext(applicationId);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AMContext is null? = " + (amContext == null));
+        if (amContext != null) {
+          LOG.debug("AMContext dump: " + amContext.toString());
+        }
+      }
+      if (amContext == null || amContext.getSubmissionContext() == null) {
+        applicationReport.setYarnApplicationState(
+            YarnApplicationState.NEW);
+      } else {
+        // TODO fix
+        // Needs a pingback from the AM to really fix this correctly
+        // 5 second delay is not necessarily a guarantee
+        int pollWaitInterval = getConfig().getInt(
+            LazyAMConfig.POLLING_INTERVAL_SECONDS,
+            LazyAMConfig.DEFAULT_POLLING_INTERVAL_SECONDS) * 5000;
+        if (amContext.getCurrentApplicationAttemptId() == null
+            || amContext.getJobPickUpTime() <= 0
+            || (System.currentTimeMillis() <
+              (amContext.getJobPickUpTime() + pollWaitInterval))) {
+          jobPickUpTime = amContext.getJobPickUpTime();
+          applicationReport.setYarnApplicationState(
+              YarnApplicationState.SUBMITTED);
+        }
+      }
+      LOG.info("Received an application report request"
+          + ", applicationId=" + applicationId
+          + ", applicationState=" + applicationReport.getYarnApplicationState()
+          + ", jobPickupTime=" + jobPickUpTime);
+    }
+    response.setApplicationReport(applicationReport);
+    return response;
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnRemoteException {
+    GetClusterMetricsResponse response = Records.newRecord(
+        GetClusterMetricsResponse.class);
+    response.setClusterMetrics(
+        context.getRMYarnClient().getYarnClusterMetrics());
+    return response;
+  }
+
+  @Override
+  public GetAllApplicationsResponse getAllApplications(
+      GetAllApplicationsRequest request) throws YarnRemoteException {
+    GetAllApplicationsResponse response = Records.newRecord(
+        GetAllApplicationsResponse.class);
+    response.setApplicationList(
+        context.getRMYarnClient().getApplicationList());
+    return response;
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+      throws YarnRemoteException {
+    GetClusterNodesResponse response = Records.newRecord(
+        GetClusterNodesResponse.class);
+    response.setNodeReports(context.getRMYarnClient().getNodeReports());
+    return response;
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnRemoteException {
+    GetQueueInfoResponse response = Records.newRecord(
+        GetQueueInfoResponse.class);
+    response.setQueueInfo(context.getRMYarnClient().getQueueInfo(
+        request.getQueueName()));
+    return response;
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
+    GetQueueUserAclsInfoResponse response =
+        Records.newRecord(GetQueueUserAclsInfoResponse.class);
+    response.setUserAclsInfoList(
+        context.getRMYarnClient().getQueueAclsInfo());
+    return response;
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnRemoteException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnRemoteException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnRemoteException {
+    if (!context.isManagedApp(
+        request.getApplicationSubmissionContext().getApplicationId())) {
+      LOG.info("Received a submitApplication request for non-managed app"
+          + ", proxying to real RM"
+          + ", appId="
+          + request.getApplicationSubmissionContext().getApplicationId());
+      context.getRMYarnClient().submitApplication(
+          request.getApplicationSubmissionContext());
+      return Records.newRecord(SubmitApplicationResponse.class);
+    }
+    LOG.info("Received a submitApplication request for managed app"
+        + ", appId="
+        + request.getApplicationSubmissionContext().getApplicationId());
+    return context.submitApplication(request);
+  }
+
+}