You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ac...@apache.org on 2013/03/15 22:26:48 UTC
svn commit: r1457129 [2/38] - in /incubator/tez: ./ tez-ampool/
tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/
tez-ampool/src/main/conf/ tez-ampool/src/main/java/
tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-amp...
Added: incubator/tez/tez-ampool/src/main/bin/tez-config.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/bin/tez-config.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/bin/tez-config.sh (added)
+++ incubator/tez/tez-ampool/src/main/bin/tez-config.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# included in all the tez scripts with source command
+# should not be executable directly
+# also should not be passed any arguments, since we need original $*
+
+# Resolve links ($0 may be a softlink) and convert a relative path
+# to an absolute path. NB: The -P option requires bash built-ins
+# or POSIX:2001 compliant cd and pwd.
+
+# TEZ_CLASSPATH Extra Java CLASSPATH entries.
+#
+
+this="${BASH_SOURCE-$0}"
+tez_bin=$(cd -P -- "$(dirname -- "$this")" && pwd -P)
+script="$(basename -- "$this")"
+this="$tez_bin/$script"
+
+TEZ_DIR=${TEZ_DIR:-""}
+TEZ_LIB_DIR=${TEZ_LIB_DIR:-"lib"}
+
+TEZ_DEFAULT_HOME=$(cd -P -- "$tez_bin"/.. && pwd -P)
+TEZ_HOME=${TEZ_HOME:-$TEZ_DEFAULT_HOME}
+export TEZ_HOME
+
+#check to see if the conf dir is given as an optional argument
+if [ $# -gt 1 ]
+then
+ if [ "--config" = "$1" ]
+ then
+ shift
+ confdir=$1
+ shift
+ TEZ_CONF_DIR=$confdir
+ fi
+fi
+
+export TEZ_CONF_DIR="${TEZ_CONF_DIR:-$TEZ_HOME/conf}"
+
+if [ -f "${TEZ_CONF_DIR}/tez-env.sh" ]; then
+ . "${TEZ_CONF_DIR}/tez-env.sh"
+fi
+
+# Newer versions of glibc use an arena memory allocator that causes virtual
+# memory usage to explode.
+export MALLOC_ARENA_MAX=${MALLOC_ARENA_MAX:-4}
+
+# Attempt to set JAVA_HOME if it is not set
+if [[ -z $JAVA_HOME ]]; then
+ # On OSX use java_home (or /Library for older versions)
+ if [ "Darwin" == "$(uname -s)" ]; then
+ if [ -x /usr/libexec/java_home ]; then
+ export JAVA_HOME=($(/usr/libexec/java_home))
+ else
+ export JAVA_HOME=(/Library/Java/Home)
+ fi
+ fi
+
+ # Bail if we did not detect it
+ if [[ -z $JAVA_HOME ]]; then
+ echo "Error: JAVA_HOME is not set and could not be found." 1>&2
+ exit 1
+ fi
+fi
+
+JAVA=$JAVA_HOME/bin/java
+# some Java parameters
+JAVA_HEAP_MAX=-Xmx1024m
+
+# check envvars which might override default args
+if [[ "$TEZ_HEAPSIZE" != "" ]]; then
+ #echo "run with heapsize $TEZ_HEAPSIZE"
+ JAVA_HEAP_MAX="-Xmx""$TEZ_HEAPSIZE""m"
+ #echo $JAVA_HEAP_MAX
+fi
+
+# CLASSPATH initially contains $TEZ_CONF_DIR
+CLASSPATH="${TEZ_CONF_DIR}"
+
+if [ "$TEZ_USER_CLASSPATH_FIRST" != "" ] && [ "$TEZ_CLASSPATH" != "" ] ; then
+ CLASSPATH=${CLASSPATH}:${TEZ_CLASSPATH}
+fi
+
+CLASSPATH=${CLASSPATH}:${TEZ_HOME}/${TEZ_DIR}'/*':${TEZ_HOME}/${TEZ_LIB_DIR}'/*'
+
+# so that filenames w/ spaces are handled correctly in loops below
+IFS=
+
+if [[ -z $HADOOP_HOME ]]; then
+ HADOOP_PATH=`which hadoop`
+ result=$?
+ if [ "$result" != "0" ]; then
+ echo 'Failed to find hadoop in $PATH.'
+ echo 'Please ensure that HADOOP_HOME is defined or hadoop is in your $PATH.'
+ exit 1
+ fi
+else
+ HADOOP_PATH="$HADOOP_HOME/bin/hadoop"
+fi
+
+HADOOP_CLASSPATH=`$HADOOP_PATH classpath`
+result=$?
+if [ "$result" != "0" ]; then
+ echo "Failed to run $HADOOP_PATH classpath. Result=$result."
+ echo 'Please ensure that HADOOP_HOME is defined or hadoop is in your $PATH.'
+ exit 1
+fi
+CLASSPATH=${CLASSPATH}:$HADOOP_CLASSPATH
+
+# add user-specified CLASSPATH last
+if [ "$TEZ_USER_CLASSPATH_FIRST" = "" ] && [ "$TEZ_CLASSPATH" != "" ]; then
+ CLASSPATH=${CLASSPATH}:${TEZ_CLASSPATH}
+fi
+
+# default log directory & file
+if [ "$TEZ_LOG_DIR" = "" ]; then
+ TEZ_LOG_DIR="$TEZ_HOME/logs"
+fi
+
+# restore ordinary behaviour
+unset IFS
+
+echo "$CLASSPATH"
Added: incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh (added)
+++ incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,177 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# Runs a Hadoop command as a daemon.
+#
+# Environment Variables
+#
+# TEZ_CONF_DIR Alternate conf dir. Default is ${TEZ_HOME}/conf.
+# TEZ_LOG_DIR Where log files are stored. PWD by default.
+# TEZ_PID_DIR The pid files are stored. /tmp by default.
+# TEZ_IDENT_STRING A string representing this instance of tez. $USER by default
+# TEZ_NICENESS The scheduling priority for daemons. Defaults to 0.
+##
+
+usage="Usage: tez-daemon.sh [--config <conf-dir>] (start|stop) <tez-command> <args...>"
+
+# if no args specified, show usage
+if [ $# -le 1 ]; then
+ echo $usage
+ exit 1
+fi
+
+bin=`dirname "${BASH_SOURCE-$0}"`
+bin=`cd "$bin"; pwd`
+
+DEFAULT_LIBEXEC_DIR="$bin"/../libexec
+TEZ_LIBEXEC_DIR=${TEZ_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
+. $TEZ_LIBEXEC_DIR/tez-config.sh
+
+# get arguments
+
+startStop=$1
+shift
+command=$1
+shift
+
+tez_rotate_log ()
+{
+ log=$1;
+ num=5;
+ if [ -n "$2" ]; then
+ num=$2
+ fi
+ if [ -f "$log" ]; then # rotate logs
+ while [ $num -gt 1 ]; do
+ prev=`expr $num - 1`
+ [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+ num=$prev
+ done
+ mv "$log" "$log.$num";
+ fi
+}
+
+if [ -f "${TEZ_CONF_DIR}/tez-env.sh" ]; then
+ . "${TEZ_CONF_DIR}/tez-env.sh"
+fi
+
+if [ "$TEZ_IDENT_STRING" = "" ]; then
+ export TEZ_IDENT_STRING="$USER"
+fi
+
+# get log directory
+if [ "$TEZ_LOG_DIR" = "" ]; then
+ export TEZ_LOG_DIR="$TEZ_HOME/logs"
+fi
+
+if [ ! -w "$TEZ_LOG_DIR" ] ; then
+ mkdir -p "$TEZ_LOG_DIR"
+ chown $TEZ_IDENT_STRING $TEZ_LOG_DIR
+fi
+
+if [ "$TEZ_PID_DIR" = "" ]; then
+ TEZ_PID_DIR=/tmp
+fi
+
+# some variables
+export TEZ_LOGFILE=tez-$TEZ_IDENT_STRING-$command-$HOSTNAME.log
+export TEZ_ROOT_LOGGER=${TEZ_ROOT_LOGGER:-"INFO,RFA"}
+log=$TEZ_LOG_DIR/tez-$TEZ_IDENT_STRING-$command-$HOSTNAME.out
+pid=$TEZ_PID_DIR/tez-$TEZ_IDENT_STRING-$command.pid
+TEZ_STOP_TIMEOUT=${TEZ_STOP_TIMEOUT:-5}
+
+# Set default scheduling priority
+if [ "$TEZ_NICENESS" = "" ]; then
+ export TEZ_NICENESS=0
+fi
+
+TEZ_OPTS="$TEZ_OPTS -Dtez.log.dir=$TEZ_LOG_DIR"
+TEZ_OPTS="$TEZ_OPTS -Dtez.log.file=$TEZ_LOGFILE"
+TEZ_OPTS="$TEZ_OPTS -Dtez.home.dir=$TEZ_HOME"
+TEZ_OPTS="$TEZ_OPTS -Dtez.id.str=$TEZ_IDENT_STRING"
+TEZ_OPTS="$TEZ_OPTS -Dtez.root.logger=${TEZ_ROOT_LOGGER:-INFO,console}"
+
+case $startStop in
+
+ (start)
+
+ [ -w "$TEZ_PID_DIR" ] || mkdir -p "$TEZ_PID_DIR"
+
+ if [ -f $pid ]; then
+ if kill -0 `cat $pid` > /dev/null 2>&1; then
+ echo $command running as process `cat $pid`. Stop it first.
+ exit 1
+ fi
+ fi
+
+ tez_rotate_log $log
+ echo starting $command, logging to $log
+ cd "$TEZ_HOME"
+ case $command in
+ ampoolservice)
+ export CLASSPATH=$CLASSPATH
+ nohup nice -n $TEZ_NICENESS "$JAVA" $JAVA_HEAP_MAX $TEZ_OPTS \
+ org.apache.tez.ampool.AMPoolService --cli \
+ > "$log" 2>&1 < /dev/null &
+
+ ;;
+ (*)
+ echo "Unknown command $command"
+ echo "Only command 'ampoolservice' is supported"
+ exit 1
+ ;;
+ esac
+ echo $! > $pid
+ sleep 1
+ echo "ulimit -a for user $USER" >> $log
+ ulimit -a >> $log 2>&1
+ head -30 "$log"
+ sleep 3;
+ if ! ps -p $! > /dev/null ; then
+ exit 1
+ fi
+ ;;
+
+ (stop)
+
+ if [ -f $pid ]; then
+ TARGET_PID=`cat $pid`
+ if kill -0 $TARGET_PID > /dev/null 2>&1; then
+ echo stopping $command
+ kill $TARGET_PID
+ sleep $TEZ_STOP_TIMEOUT
+ if kill -0 $TARGET_PID > /dev/null 2>&1; then
+ echo "$command did not stop gracefully after $TEZ_STOP_TIMEOUT seconds: killing with kill -9"
+ kill -9 $TARGET_PID
+ fi
+ else
+ echo no $command to stop
+ fi
+ else
+ echo no $command to stop
+ fi
+ ;;
+
+ (*)
+ echo $usage
+ exit 1
+ ;;
+
+esac
+
+
Propchange: incubator/tez/tez-ampool/src/main/bin/tez-daemon.sh
------------------------------------------------------------------------------
svn:executable = *
Added: incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml (added)
+++ incubator/tez/tez-ampool/src/main/conf/lazy-mram-site.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,40 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <!-- Uncomment out and set values as needed.
+ Current file is a duplicate of the default settings -->
+
+ <!--
+ <property>
+ <description>How often to poll AMPoolService for job assignment</description>
+ <name>yarn.app.mapreduce.am.lazy.polling-interval.secs</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <description>How many containers to pre-allocate after starting up</description>
+ <name>yarn.app.mapreduce.am.lazy.prealloc-container-count</name>
+ <value>0</value>
+ </property>
+ -->
+
+</configuration>
Added: incubator/tez/tez-ampool/src/main/conf/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/log4j.properties?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/log4j.properties (added)
+++ incubator/tez/tez-ampool/src/main/conf/log4j.properties Fri Mar 15 21:26:36 2013
@@ -0,0 +1,80 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Define some default values that can be overridden by system properties
+tez.root.logger=INFO,console
+tez.log.dir=.
+tez.log.file=tez.log
+
+# Define the root logger to the system property "tez.root.logger".
+log4j.rootLogger=${tez.root.logger}
+
+# Logging Threshold
+log4j.threshold=ALL
+
+# Null Appender
+log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
+
+#
+# Rolling File Appender - cap space usage at 5gb.
+#
+tez.log.maxfilesize=256MB
+tez.log.maxbackupindex=10
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.File=${tez.log.dir}/${tez.log.file}
+
+log4j.appender.RFA.MaxFileSize=${tez.log.maxfilesize}
+log4j.appender.RFA.MaxBackupIndex=${tez.log.maxbackupindex}
+
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${tez.log.dir}/${tez.log.file}
+
+# Rollver at midnight
+log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+
+# 30-day backup
+#log4j.appender.DRFA.MaxBackupIndex=30
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n
Added: incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml (added)
+++ incubator/tez/tez-ampool/src/main/conf/tez-ampool-site.xml Fri Mar 15 21:26:36 2013
@@ -0,0 +1,119 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<configuration>
+
+ <!-- Uncomment out and set values as needed.
+ Current file is a duplicate of the default settings -->
+
+ <!--
+ <property>
+ <description>Memory to use for AMPoolService AM</description>
+ <name>tez.ampool.am.master_memory</name>
+ <value>1024</value>
+ </property>
+
+ <property>
+ <description>Queue to launch AMPoolService AM</description>
+ <name>tez.ampool.am.master_queue</name>
+ <value>default</value>
+ </property>
+
+ <property>
+ <description>Port to use for AMPoolService status</description>
+ <name>tez.ampool.ws.port</name>
+ <value>12999</value>
+ </property>
+
+ <property>
+ <description>Minimum size of AM Pool</description>
+ <name>tez.ampool.am-pool-size</name>
+ <value>3</value>
+ </property>
+
+ <property>
+ <description>Maximum size of AM Pool</description>
+ <name>tez.ampool.max-am-pool-size</name>
+ <value>5</value>
+ </property>
+
+ <property>
+ <description>When to launch new AM. If true, launched after an existing AM in pool completes. Else, launched as soon as a job is assigned to an AM from the pool</description>
+ <name>tez.ampool.launch-new-am-after-app-completion</name>
+ <value>true</value>
+ </property>
+
+ <property>
+ <description>How many launch failures to account for unassigned AMs before shutting down AMPoolService</description>
+ <name>tez.ampool.max-am-launch-failures</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>No. of threads to use to serve the ClientRMProtocol proxy fronted by the AMPoolService</description>
+ <name>tez.ampool.rm-proxy-client.thread-count</name>
+ <value>10</value>
+ </property>
+
+ <property>
+ <description>Address on which to run the ClientRMProtocol proxy</description>
+ <name>tez.ampool.address</name>
+ <value>0.0.0.0:10030</value>
+ </property>
+
+ <property>
+ <description>Memory to use when launching the lazy MR AM</description>
+ <name>tez.ampool.mr-am.memory-allocation-mb</name>
+ <value>1536</value>
+ </property>
+
+ <property>
+ <description>Queue to which the Lazy MRAM is to be submitted to</description>
+ <name>tez.ampool.mr-am.queue-name</name>
+ <value>default</value>
+ </property>
+
+ <property>
+ <description>Comma-separated paths to job jars on DFS (optional)</description>
+ <name>tez.ampool.mr-am.job-jar-path</name>
+ <value></value>
+ </property>
+
+ <property>
+ <description>Main class for Lazy MRAM</description>
+ <name>tez.ampool.mr-am.application-master-class</name>
+ <value>org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster</value>
+ </property>
+
+ <property>
+ <description>Local filesystem path for staging local data used by AMPoolClient/AMPoolService</description>
+ <name>tez.ampool.tmp-dir-path</name>
+ <value>/tmp/ampoolservice/</value>
+ </property>
+
+ <property>
+ <description>Path on FS used by AMPoolService to upload lazy-mr-am config</description>
+ <name>tez.ampool.am.staging-dir</name>
+ <value>/tmp/tez/ampool/staging/</value>
+ </property>
+ -->
+
+</configuration>
+
Added: incubator/tez/tez-ampool/src/main/conf/tez-env.sh
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/conf/tez-env.sh?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/conf/tez-env.sh (added)
+++ incubator/tez/tez-ampool/src/main/conf/tez-env.sh Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+# Copyright 2011 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Set Tez-specific environment variables here.
+
+# The only required environment variables are JAVA_HOME and HADOOP_HOME. All others are
+# optional.
+
+# The java implementation to use.
+export JAVA_HOME=${JAVA_HOME}
+
+# Set HADOOP_HOME as needed by Tez
+#export HADOOP_HOME=
+
+# The maximum amount of heap to use, in MB. Default is 1024.
+#export TEZ_HEAPSIZE=
+
+# Where log files are stored. $TEZ_HOME/logs by default.
+#export TEZ_LOG_DIR=${TEZ_LOG_DIR}/$USER
+
+# The directory where pid files are stored. /tmp by default.
+# NOTE: this should be set to a directory that can only be written to by
+# the user that will run the tez daemons. Otherwise there is the
+# potential for a symlink attack.
+export TEZ_PID_DIR=${TEZ_PID_DIR}
+
+# A string representing this instance of tez. $USER by default.
+export TEZ_IDENT_STRING=$USER
Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyAMConfig.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,58 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class LazyAMConfig extends Configuration {
+
+ private static final String LAZY_AM_DEFAULT_XML_FILE = "lazy-mram-default.xml";
+ private static final String LAZY_AM_SITE_XML_FILE = "lazy-mram-site.xml";
+ public static final String LAZY_AM_JOB_XML_FILE = "lazy-mram-job.xml";
+
+ static {
+ Configuration.addDefaultResource(LAZY_AM_DEFAULT_XML_FILE);
+ Configuration.addDefaultResource(LAZY_AM_SITE_XML_FILE);
+ Configuration.addDefaultResource(LAZY_AM_JOB_XML_FILE);
+ }
+
+ public static final String LAZY_MR_AM_PREFIX =
+ MRJobConfig.MR_AM_PREFIX + "lazy.";
+
+ public static final String POLLING_INTERVAL_SECONDS =
+ LAZY_MR_AM_PREFIX + "polling-interval.secs";
+ public static final int DEFAULT_POLLING_INTERVAL_SECONDS = 1;
+
+ public static final String PREALLOC_CONTAINER_COUNT =
+ LAZY_MR_AM_PREFIX + "prealloc-container-count";
+ public static final int DEFAULT_PREALLOC_CONTAINER_COUNT = 0;
+
+ public LazyAMConfig() {
+ super();
+ }
+
+ public LazyAMConfig(Configuration conf) {
+ super(conf);
+ if (! (conf instanceof LazyAMConfig)) {
+ this.reloadConfiguration();
+ }
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRAppMaster.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,620 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.httpclient.DefaultHttpMethodRetryHandler;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.httpclient.HttpStatus;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.httpclient.params.HttpMethodParams;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.node.AMNodeMap;
+import org.apache.hadoop.mapreduce.v2.app2.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.tez.ampool.AMPoolConfiguration;
+import org.apache.tez.ampool.rest.ApplicationPollResponse;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.codehaus.jackson.map.ObjectMapper;
+
+public class LazyMRAppMaster extends MRAppMaster {
+
+ private static final Log LOG = LogFactory.getLog(LazyMRAppMaster.class);
+
+ private volatile boolean stopped;
+ private volatile boolean jobInitialized;
+ private volatile boolean jobStarted;
+
+ protected final String jobPollingUrl;
+ private int jobPollingIntervalSeconds;
+ private NotRunningJob notRunningJob;
+
+ private List<Service> services;
+
+ public LazyMRAppMaster(ApplicationAttemptId applicationAttemptId,
+ ContainerId containerId, String nmHost, int nmPort, int nmHttpPort,
+ long appSubmitTime, String pollingUrl) {
+ super(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort,
+ appSubmitTime);
+ this.jobPollingUrl = pollingUrl;
+ this.stopped = false;
+ this.jobInitialized = false;
+ this.jobStarted = false;
+ this.services = new ArrayList<Service>();
+ }
+
+ private void initAfterJobSubmission(Configuration jobConf) {
+
+ downloadTokensAndSetupUGI(jobConf);
+
+ // Job name is the same as the app name util we support DAG of jobs
+ // for an app later
+ appName = jobConf.get(MRJobConfig.JOB_NAME, "<missing app name>");
+
+
+ newApiCommitter = false;
+ jobId = MRBuilderUtils.newJobId(appAttemptID.getApplicationId(),
+ appAttemptID.getApplicationId().getId());
+ int numReduceTasks = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0);
+ if ((numReduceTasks > 0 &&
+ jobConf.getBoolean("mapred.reducer.new-api", false)) ||
+ (numReduceTasks == 0 &&
+ jobConf.getBoolean("mapred.mapper.new-api", false))) {
+ newApiCommitter = true;
+ LOG.info("Using mapred newApiCommitter.");
+ }
+
+ committer = createOutputCommitter(jobConf);
+ boolean recoveryEnabled = jobConf.getBoolean(
+ MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+ boolean recoverySupportedByCommitter = committer.isRecoverySupported();
+
+ // TODO fix - recovery will currently never happen with lazy start
+ // need to follow a different startup route for non-first attempts
+ // for recovery to work with the recovery dispatcher
+ LOG.info("Not starting RecoveryService: recoveryEnabled: "
+ + recoveryEnabled + " recoverySupportedByCommitter: "
+ + recoverySupportedByCommitter + " ApplicationAttemptID: "
+ + appAttemptID.getAttemptId());
+
+
+ if (jobConf.getBoolean(MRJobConfig.MAP_SPECULATIVE, false)
+ || jobConf.getBoolean(MRJobConfig.REDUCE_SPECULATIVE, false)) {
+ //optional service to speculate on task attempts' progress
+ speculator = createSpeculator(jobConf, context);
+ addService(speculator);
+ }
+
+ speculatorEventDispatcher = new SpeculatorEventDispatcher(jobConf);
+ dispatcher.register(Speculator.EventType.class,
+ speculatorEventDispatcher);
+
+ //service to log job history events
+ jobHistoryEventHandler = createJobHistoryHandler(context);
+ dispatcher.register(org.apache.hadoop.mapreduce.jobhistory.EventType.class,
+ jobHistoryEventHandler);
+
+ // Add the JobHistoryEventHandler last so that it is properly stopped first.
+ // This will guarantee that all history-events are flushed before AM goes
+ // ahead with shutdown.
+ // Note: Even though JobHistoryEventHandler is started last, if any
+ // component creates a JobHistoryEvent in the meanwhile, it will be just be
+ // queued inside the JobHistoryEventHandler
+ addService(this.jobHistoryEventHandler);
+
+ // init newly added services
+ initServices();
+ }
+
+ @Override
+ protected ContainerRequestor createContainerRequestor(
+ ClientService clientService, AppContext context) {
+ return new LazyRMContainerRequestor(clientService, context);
+ }
+
+ @Override
+ protected ContainerAllocator createAMScheduler(ContainerRequestor requestor,
+ AppContext appContext) {
+ return new LazyRMContainerAllocator(requestor, context);
+ }
+
+
+ @Override
+ public void init(Configuration conf) {
+ this.jobPollingIntervalSeconds =
+ conf.getInt(LazyAMConfig.POLLING_INTERVAL_SECONDS,
+ LazyAMConfig.DEFAULT_POLLING_INTERVAL_SECONDS);
+
+ conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+ appAttemptID.getAttemptId());
+ conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
+ this.conf = conf;
+ super.startTime = -1;
+ super.appSubmitTime = -1;
+
+ context = new LazyStartAppContext(this, conf);
+ notRunningJob = new NotRunningJob(super.appAttemptID,
+ context, conf);
+
+ // add services that can be started without a job
+
+ dispatcher = createDispatcher();
+ addService(dispatcher);
+
+ // service to handle requests from JobClient, start webapp handler
+ clientService = new LazyMRClientService(context);
+ addService(clientService);
+
+ taskHeartbeatHandler = createTaskHeartbeatHandler(context, conf);
+ addService(taskHeartbeatHandler);
+
+ containerHeartbeatHandler = createContainerHeartbeatHandler(context, conf);
+ addService(containerHeartbeatHandler);
+
+ //service to handle requests to TaskUmbilicalProtocol
+ taskAttemptListener = createTaskAttemptListener(context,
+ taskHeartbeatHandler, containerHeartbeatHandler);
+ addService(taskAttemptListener);
+
+ containers = new AMContainerMap(containerHeartbeatHandler,
+ taskAttemptListener, context);
+ addService(containers);
+ dispatcher.register(AMContainerEventType.class, containers);
+
+ nodes = new AMNodeMap(dispatcher.getEventHandler(), context);
+ addService(nodes);
+ dispatcher.register(AMNodeEventType.class, nodes);
+
+ //service to do the task cleanup
+ taskCleaner = createTaskCleaner(context);
+ addService(taskCleaner);
+
+ this.jobEventDispatcher = new JobEventDispatcher();
+
+ //register the event dispatchers
+ dispatcher.register(JobEventType.class, jobEventDispatcher);
+ dispatcher.register(TaskEventType.class, new TaskEventDispatcher());
+ dispatcher.register(TaskAttemptEventType.class,
+ new TaskAttemptEventDispatcher());
+ dispatcher.register(TaskCleaner.EventType.class, taskCleaner);
+
+ // TODO XXX: Rename to NMComm
+ // corresponding service to launch allocated containers via NodeManager
+ // containerLauncher = createNMCommunicator(context);
+ containerLauncher = createContainerLauncher(context);
+ addService(containerLauncher);
+ dispatcher.register(NMCommunicatorEventType.class, containerLauncher);
+
+ // service to allocate containers from RM (if non-uber) or to fake it (uber)
+ containerRequestor = createContainerRequestor(clientService, context);
+ addService(containerRequestor);
+ dispatcher.register(RMCommunicatorEventType.class, containerRequestor);
+
+ amScheduler = createAMScheduler(containerRequestor, context);
+ addService(amScheduler);
+ dispatcher.register(AMSchedulerEventType.class, amScheduler);
+
+ // Add the staging directory cleaner before the history server but after
+ // the container allocator so the staging directory is cleaned after
+ // the history has been flushed but before unregistering with the RM.
+ this.stagingDirCleanerService = createStagingDirCleaningService();
+ addService(stagingDirCleanerService);
+
+ initServices();
+ }
+
+ private void addService(Object o) {
+ if ( o instanceof Service) {
+ services.add((Service) o);
+ }
+ }
+
+ private void initServices() {
+ for (Service s : services) {
+ if (s.getServiceState().equals(STATE.NOTINITED)) {
+ s.init(conf);
+ }
+ }
+ }
+
+ private void startServices() {
+ for (Service s : services) {
+ if (s.getServiceState().equals(STATE.INITED)) {
+ s.start();
+ }
+ }
+ }
+
+ private void stopServices() {
+ // stop in reverse order
+ for (int i = services.size() - 1; i >= 0; --i) {
+ Service s = services.get(i);
+ if (s.getServiceState().equals(STATE.STARTED)) {
+ s.stop();
+ }
+ }
+ }
+
+ @Override
+ public void start() {
+ startServices();
+ if (!jobInitialized) {
+ Thread poller = new Thread(
+ new JobPollerRunnable(jobPollingUrl, jobPollingIntervalSeconds));
+ poller.setName("JobPollerThread");
+ poller.run();
+ } else {
+ LOG.info("Job submitted to LazyMRAM. Starting job");
+ super.startAM(conf);
+ this.jobStarted = true;
+ super.startJobs();
+ }
+ }
+
+ public void lazyStart(String jobConfFileLocation,
+ String jobJarLocation,
+ long jobSubmissionTime) {
+ super.startTime = super.clock.getTime();
+ super.appSubmitTime = jobSubmissionTime;
+ LOG.info("Launching job using"
+ + ", jobConfLocation=" + jobConfFileLocation
+ + ", jobJarLocation=" + jobJarLocation
+ + ", jobSubmissionTime=" + jobSubmissionTime);
+
+ FileSystem fs;
+ Path confPath = new Path(jobConfFileLocation);
+
+ Configuration mergeConf = new Configuration(conf);
+ try {
+ fs = FileSystem.get(conf);
+ FileStatus fStatus = fs.getFileStatus(confPath);
+ if (!fStatus.isFile()
+ || fStatus.getLen() <= 0) {
+ LOG.error("Invalid conf file provided"
+ + ", jobConfLocation=" + jobConfFileLocation);
+ System.exit(-1);
+ } else {
+ LOG.info("Job Conf File Status"
+ + fStatus.toString());
+ mergeConf.addResource(fs.open(confPath));
+ }
+ } catch (IOException e) {
+ LOG.error("Invalid conf file provided"
+ + ", jobConfLocation=" + jobConfFileLocation, e);
+ System.exit(-1);
+ }
+ if (jobJarLocation != null
+ && !jobJarLocation.isEmpty()) {
+ mergeConf.set(MRJobConfig.JOB_JAR,
+ jobJarLocation);
+ }
+
+ // Explicitly disable uber jobs as AM cannot be relocalized
+ mergeConf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+
+ // for now disable recovery
+ // TODO fix
+ mergeConf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, false);
+
+ this.conf = new JobConf(mergeConf);
+ ((LazyStartAppContext)context).setConfiguration(conf);
+
+ // Init and start main AM
+ LOG.info("Initializing main MR AM");
+ initAfterJobSubmission(conf);
+ jobInitialized = true;
+ LOG.info("Starting main MR AM");
+ start();
+ LOG.info("Started main MR AM");
+ }
+
+ @Override
+ public void stop() {
+ stopServices();
+ }
+
+ public static void main(String[] args) {
+ try {
+ Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+ DeprecatedKeys.init();
+ String containerIdStr =
+ System.getenv(ApplicationConstants.AM_CONTAINER_ID_ENV);
+ String nodeHostString = System.getenv(ApplicationConstants.NM_HOST_ENV);
+ String nodePortString = System.getenv(ApplicationConstants.NM_PORT_ENV);
+ String nodeHttpPortString =
+ System.getenv(ApplicationConstants.NM_HTTP_PORT_ENV);
+ String appSubmitTimeStr =
+ System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV);
+ String pollingUrl =
+ System.getenv(AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV);
+
+ MRAppMaster.validateInputParam(containerIdStr,
+ ApplicationConstants.AM_CONTAINER_ID_ENV);
+ MRAppMaster.validateInputParam(nodeHostString, ApplicationConstants.NM_HOST_ENV);
+ MRAppMaster.validateInputParam(nodePortString, ApplicationConstants.NM_PORT_ENV);
+ MRAppMaster.validateInputParam(nodeHttpPortString,
+ ApplicationConstants.NM_HTTP_PORT_ENV);
+ MRAppMaster.validateInputParam(appSubmitTimeStr,
+ ApplicationConstants.APP_SUBMIT_TIME_ENV);
+ MRAppMaster.validateInputParam(pollingUrl,
+ AMPoolConfiguration.LAZY_AM_POLLING_URL_ENV);
+
+ ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+ ApplicationAttemptId applicationAttemptId =
+ containerId.getApplicationAttemptId();
+ long appSubmitTime = Long.parseLong(appSubmitTimeStr);
+ pollingUrl += applicationAttemptId.toString();
+ LOG.info("Polling url for lazy init of job is "
+ + pollingUrl);
+
+ LazyMRAppMaster appMaster =
+ new LazyMRAppMaster(applicationAttemptId, containerId, nodeHostString,
+ Integer.parseInt(nodePortString),
+ Integer.parseInt(nodeHttpPortString), appSubmitTime,
+ pollingUrl);
+ ShutdownHookManager.get().addShutdownHook(
+ new LazyMRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
+ YarnConfiguration conf = new YarnConfiguration(new LazyAMConfig());
+ String jobUserName = System
+ .getenv(ApplicationConstants.Environment.USER.name());
+ conf.set(MRJobConfig.USER_NAME, jobUserName);
+ // Do not automatically close FileSystem objects so that in case of
+ // SIGTERM I have a chance to write out the job history. I'll be closing
+ // the objects myself.
+ conf.setBoolean("fs.automatic.close", false);
+ initAndStartAppMaster(appMaster, conf, jobUserName);
+ } catch (Throwable t) {
+ LOG.fatal("Error starting MRAppMaster", t);
+ System.exit(1);
+ }
+ }
+
+ protected static void initAndStartAppMaster(final LazyMRAppMaster appMaster,
+ final YarnConfiguration conf, String jobUserName) throws IOException,
+ InterruptedException {
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation appMasterUgi = UserGroupInformation
+ .createRemoteUser(jobUserName);
+
+ appMasterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws Exception {
+ appMaster.init(conf);
+ appMaster.start();
+ return null;
+ }
+ });
+ }
+
+ private class JobPollerRunnable implements Runnable {
+
+ private final Log LOG = LogFactory.getLog(JobPollerRunnable.class);
+ private final HttpClient httpClient;
+ private final GetMethod getMethod;
+ private final int pollingIntervalSeconds;
+
+ public JobPollerRunnable(String pollingUrl,
+ int pollingIntervalSeconds) {
+ this.httpClient = new HttpClient();
+ this.getMethod = new GetMethod(pollingUrl);
+ this.getMethod.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
+ new DefaultHttpMethodRetryHandler(1, false));
+ this.pollingIntervalSeconds = pollingIntervalSeconds;
+ }
+
+ @Override
+ public void run() {
+ while (!stopped) {
+ boolean exceptionThrown = false;
+ try {
+ LOG.debug("Polling for job"
+ + ", pollingUrl=" + jobPollingUrl);
+ int responseCode = httpClient.executeMethod(getMethod);
+ if (responseCode == HttpStatus.SC_OK) {
+ String responseBody = getMethod.getResponseBodyAsString();
+ ObjectMapper mapper = new ObjectMapper();
+ ApplicationPollResponse response =
+ mapper.readValue(responseBody,
+ ApplicationPollResponse.class);
+ LOG.info("Received poll response"
+ + ", pollingUrl=" + jobPollingUrl
+ + ", pollResonse=" + response);
+ lazyStart(response.getConfigurationFileLocation(),
+ response.getApplicationJarLocation(),
+ response.getApplicationSubmissionTime());
+ break;
+ }
+ } catch (HttpException e) {
+ LOG.error(e);
+ exceptionThrown = true;
+ } catch (IOException e) {
+ LOG.error(e);
+ exceptionThrown = true;
+ }
+ if (exceptionThrown) {
+ LOG.info("Could not connect to polling url. Shutting down");
+ System.exit(-1);
+ }
+ try {
+ Thread.sleep(pollingIntervalSeconds*1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ getMethod.releaseConnection();
+ }
+ }
+
+
+ // The shutdown hook that runs when a signal is received AND during normal
+ // close of the JVM.
+ static class LazyMRAppMasterShutdownHook implements Runnable {
+ LazyMRAppMaster appMaster;
+ LazyMRAppMasterShutdownHook(LazyMRAppMaster appMaster) {
+ this.appMaster = appMaster;
+ }
+ public void run() {
+ // TODO
+ appMaster.stop();
+ }
+ }
+
+ public class LazyStartAppContext extends RunningAppContext {
+
+ public LazyStartAppContext(LazyMRAppMaster appMaster,
+ Configuration config) {
+ super(config);
+ }
+
+ @Override
+ public ApplicationAttemptId getApplicationAttemptId() {
+ return appAttemptID;
+ }
+
+ @Override
+ public ApplicationId getApplicationID() {
+ return appAttemptID.getApplicationId();
+ }
+
+ @Override
+ public String getApplicationName() {
+ if (jobInitialized) {
+ return super.getApplicationName();
+ }
+ return "Job not initialized";
+ }
+
+ public long getSubmitTime() {
+ return appSubmitTime;
+ }
+
+ @Override
+ public long getStartTime() {
+ return super.getStartTime();
+ }
+
+ @Override
+ public Job getJob(JobId jobID) {
+ if (!jobStarted && super.getAllJobs().isEmpty()) {
+ LOG.info("Getting polled when job is not initialized yet."
+ + " Sending back dummy not running job");
+ return notRunningJob;
+ }
+ return super.getJob(jobID);
+ }
+
+ @Override
+ public Map<JobId, Job> getAllJobs() {
+ return super.getAllJobs();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Override
+ public EventHandler getEventHandler() {
+ return super.getEventHandler();
+ }
+
+ @Override
+ public CharSequence getUser() {
+ if (jobInitialized) {
+ return conf.get(MRJobConfig.USER_NAME);
+ }
+ return "";
+ }
+
+ @Override
+ public Clock getClock() {
+ return super.getClock();
+ }
+
+ @Override
+ public ClusterInfo getClusterInfo() {
+ return super.getClusterInfo();
+ }
+
+ @Override
+ public AMContainerMap getAllContainers() {
+ return super.getAllContainers();
+ }
+
+ @Override
+ public AMNodeMap getAllNodes() {
+ return super.getAllNodes();
+ }
+
+ @Override
+ public Map<ApplicationAccessType, String> getApplicationACLs() {
+ if (!jobStarted) {
+ throw new YarnException(
+ "Cannot get ApplicationACLs before all services have started");
+ }
+ return containerRequestor.getApplicationACLs();
+ }
+
+ }
+
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyMRClientService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportResponse;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.MRClientService;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyMRAppMaster.LazyStartAppContext;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.util.Records;
+
+public class LazyMRClientService extends MRClientService {
+
+ public LazyMRClientService(AppContext appContext) {
+ super(LazyMRClientService.class.getName(),
+ appContext);
+ super.setProtocolHandler(new LazyMRClientProtocolHandler(appContext));
+ }
+
+ class LazyMRClientProtocolHandler extends MRClientProtocolHandler {
+
+ private LazyStartAppContext lazyStartAppContext;
+
+ public LazyMRClientProtocolHandler(AppContext appContext) {
+ super();
+ if (appContext instanceof LazyStartAppContext) {
+ this.lazyStartAppContext = (LazyStartAppContext) appContext;
+ } else {
+ this.lazyStartAppContext = null;
+ }
+ }
+
+ @Override
+ public GetJobReportResponse getJobReport(GetJobReportRequest request)
+ throws YarnRemoteException {
+ if (lazyStartAppContext != null
+ && lazyStartAppContext.getJob(request.getJobId()) == null) {
+ // create dummy job report as client will start polling before AM is
+ // fully up
+ List<AMInfo> amInfos = new ArrayList<AMInfo>();
+ JobReport jobReport = MRBuilderUtils.newJobReport(request.getJobId(),
+ "", "", JobState.NEW,
+ -1, -1, -1, 0, 0, 0, 0, "", amInfos , false, "");
+ GetJobReportResponse response =
+ Records.newRecord(GetJobReportResponse.class);
+ response.setJobReport(jobReport);
+ return response;
+ }
+ return super.getJobReport(request);
+ }
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerAllocator.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.NotRunningJob;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+
+public class LazyRMContainerAllocator extends RMContainerAllocator {
+
+ private static final Log LOG = LogFactory.getLog(
+ LazyRMContainerAllocator.class);
+
+ public LazyRMContainerAllocator(ContainerRequestor requestor,
+ AppContext appContext) {
+ super(requestor, appContext);
+ }
+
+ @Override
+ protected synchronized void assignContainers() {
+ Job j = getJob();
+ if (j == null || j instanceof NotRunningJob) {
+ LOG.debug("Nothing to do in assignContainers as no job submitted");
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Job submitted. Assigning containers");
+ }
+ super.assignContainers();
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/hadoop/mapreduce/v2/app2/lazy/LazyRMContainerRequestor.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.lazy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+public class LazyRMContainerRequestor extends RMContainerRequestor {
+
+ private int preAllocContainerCount;
+ private int preAllocContainerMemoryMB;
+ private Configuration conf;
+
+ private static final Log LOG =
+ LogFactory.getLog(LazyRMContainerRequestor.class);
+
+ public LazyRMContainerRequestor(ClientService clientService,
+ AppContext context) {
+ super(clientService, context);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.conf = conf;
+ preAllocContainerCount = conf.getInt(
+ LazyAMConfig.PREALLOC_CONTAINER_COUNT,
+ LazyAMConfig.DEFAULT_PREALLOC_CONTAINER_COUNT);
+ preAllocContainerMemoryMB =
+ conf.getInt(MRJobConfig.MAP_MEMORY_MB,
+ MRJobConfig.DEFAULT_MAP_MEMORY_MB);
+
+ LOG.info("Prealloc container count for LazyAM is "
+ + preAllocContainerCount
+ + ", memoryMB=" + preAllocContainerMemoryMB);
+
+ if (preAllocContainerCount < 0
+ || preAllocContainerMemoryMB < 0) {
+ throw new IllegalArgumentException("Invalid config values specified"
+ + ", preAllocationContainerCount=" + preAllocContainerCount
+ + ", mapMemoryMB=" + preAllocContainerMemoryMB);
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ if (preAllocContainerCount <= 0) {
+ return;
+ }
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(preAllocContainerMemoryMB);
+ capability.setVirtualCores(1);
+
+ String[] emptyArray = new String[0];
+ for (int i = 0; i < preAllocContainerCount; ++i) {
+ ContainerRequest cReq = new ContainerRequest(capability, emptyArray,
+ emptyArray, RMContainerAllocator.PRIORITY_MAP);
+ super.addContainerReq(cReq);
+ }
+ }
+
+ @Override
+ public synchronized Configuration getConfig() {
+ return this.conf;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMContext.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+
+public class AMContext {
+
+ private final ApplicationId applicationId;
+ private ApplicationSubmissionContext submissionContext;
+ private ApplicationAttemptId currentApplicationAttemptId;
+ private long applicationSubmissionTime;
+ private long jobPickUpTime = -1;
+
+ public AMContext(ApplicationId applicationId) {
+ this.applicationId = applicationId;
+ this.submissionContext = null;
+ this.currentApplicationAttemptId = null;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+ public synchronized ApplicationSubmissionContext getSubmissionContext() {
+ return submissionContext;
+ }
+
+ public synchronized void setSubmissionContext(ApplicationSubmissionContext submissionContext) {
+ this.submissionContext = submissionContext;
+ }
+
+ public synchronized ApplicationAttemptId getCurrentApplicationAttemptId() {
+ return currentApplicationAttemptId;
+ }
+
+ public synchronized void setCurrentApplicationAttemptId(
+ ApplicationAttemptId currentApplicationAttemptId) {
+ this.currentApplicationAttemptId = currentApplicationAttemptId;
+ }
+
+ public synchronized long getApplicationSubmissionTime() {
+ return applicationSubmissionTime;
+ }
+
+ public synchronized void setApplicationSubmissionTime(long applicationSubmissionTime) {
+ this.applicationSubmissionTime = applicationSubmissionTime;
+ }
+
+ public synchronized long getJobPickUpTime() {
+ return jobPickUpTime;
+ }
+
+ public synchronized void setJobPickUpTime(long jobPickUpTime) {
+ this.jobPickUpTime = jobPickUpTime;
+ }
+
+ @Override
+ public String toString() {
+ return "applicationId=" + applicationId
+ + ", applicationSubmissionContextIsNull="
+ + (getSubmissionContext() == null)
+ + ", jobPickupTime=" + jobPickUpTime
+ + ", applicationSubmissionTime=" + getApplicationSubmissionTime()
+ + ", applicationAttemptIdIsNull=" +
+ (getCurrentApplicationAttemptId() == null);
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMLauncher.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+public interface AMLauncher {
+
+ public void launchAM();
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class AMMonitorEvent extends AbstractEvent<AMMonitorEventType> {
+
+ private final ApplicationId applicationId;
+
+ public AMMonitorEvent(AMMonitorEventType type,
+ ApplicationId applicationId) {
+ super(type);
+ this.applicationId = applicationId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return applicationId;
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorEventType.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+public enum AMMonitorEventType {
+ AM_MONITOR_START,
+ AM_MONITOR_STOP
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMMonitorService.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.ampool.manager.AMFinishedEvent;
+
+public class AMMonitorService extends AbstractService
+ implements EventHandler<AMMonitorEvent> {
+
+ private static final Log LOG = LogFactory.getLog(AMMonitorService.class);
+
+ private final AMPoolContext context;
+ private ScheduledExecutorService scheduler = null;
+ Map<ApplicationId, ScheduledFuture<?>> scheduledFutures;
+ private long heartbeatInterval = 1000l;
+
+ public AMMonitorService(AMPoolContext context) {
+ super(AMMonitorService.class.getName());
+ // TODO Auto-generated constructor stub
+ this.context = context;
+ this.scheduledFutures = new HashMap<ApplicationId, ScheduledFuture<?>>();
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ scheduler = Executors.newScheduledThreadPool(
+ conf.getInt(AMPoolConfiguration.MAX_AM_POOL_SIZE,
+ AMPoolConfiguration.DEFAULT_MAX_AM_POOL_SIZE));
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting AMMonitorService");
+ super.start();
+ }
+
+ @Override
+ public synchronized void stop() {
+ LOG.info("Stopping AMMonitorService");
+ if (scheduler != null) {
+ scheduler.shutdownNow();
+ }
+ }
+
+ private synchronized void addAppToMonitor(ApplicationId applicationId) {
+ LOG.info("Starting to monitor application"
+ + ", applicationId=" + applicationId);
+ AMMonitorTask amMonitorTask =
+ new AMMonitorTask(applicationId);
+ ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(
+ amMonitorTask, heartbeatInterval, heartbeatInterval,
+ TimeUnit.MILLISECONDS);
+ scheduledFutures.put(applicationId, future);
+ }
+
+ private synchronized void removeAppFromMonitor(ApplicationId applicationId) {
+ LOG.info("Stopping to monitor application"
+ + ", applicationId=" + applicationId);
+ if (!scheduledFutures.containsKey(applicationId)) {
+ return;
+ }
+ scheduledFutures.get(applicationId).cancel(true);
+ scheduledFutures.remove(applicationId);
+ }
+
+ private class AMMonitorTask implements Runnable {
+ private final ApplicationId applicationId;
+
+ public AMMonitorTask(ApplicationId applicationId) {
+ this.applicationId = applicationId;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void run() {
+ try {
+ LOG.info("Trying to monitor status for application"
+ + ", applicationId=" + applicationId);
+ ApplicationReport report =
+ context.getRMYarnClient().getApplicationReport(applicationId);
+ if (report == null) {
+ LOG.warn("Received null report from RM for"
+ + " applicationId=" + applicationId);
+ return;
+ }
+ LOG.info("Monitoring status for application"
+ + ", applicationId=" + applicationId
+ + ", yarnApplicationState=" + report.getYarnApplicationState()
+ + ", finalApplicationStatus="
+ + report.getFinalApplicationStatus());
+
+ // TODO needs a more cleaner fix
+ if (report.getYarnApplicationState() != null) {
+ if (report.getYarnApplicationState()
+ == YarnApplicationState.FAILED
+ || report.getYarnApplicationState()
+ == YarnApplicationState.KILLED
+ || report.getYarnApplicationState()
+ == YarnApplicationState.FINISHED) {
+ if (report.getFinalApplicationStatus() ==
+ FinalApplicationStatus.UNDEFINED) {
+ report.setFinalApplicationStatus(FinalApplicationStatus.FAILED);
+ }
+ }
+ }
+ if (report.getFinalApplicationStatus() ==
+ FinalApplicationStatus.FAILED
+ || report.getFinalApplicationStatus() ==
+ FinalApplicationStatus.SUCCEEDED
+ || report.getFinalApplicationStatus() ==
+ FinalApplicationStatus.KILLED) {
+ AMFinishedEvent event = new AMFinishedEvent(applicationId,
+ report.getFinalApplicationStatus());
+ context.getDispatcher().getEventHandler().handle(event);
+ AMMonitorEvent mEvent = new AMMonitorEvent(
+ AMMonitorEventType.AM_MONITOR_STOP, applicationId);
+ context.getDispatcher().getEventHandler().handle(mEvent);
+ }
+ } catch (YarnRemoteException e) {
+ // TODO Auto-generated catch block
+ LOG.error("Failed to acquire monitoring status for application"
+ + ", applicationId=" + applicationId);
+ e.printStackTrace();
+ }
+ }
+
+ }
+
+ @Override
+ public synchronized void handle(AMMonitorEvent event) {
+ if (event.getType() == AMMonitorEventType.AM_MONITOR_START) {
+ addAppToMonitor(event.getApplicationId());
+ } else if (event.getType() == AMMonitorEventType.AM_MONITOR_STOP) {
+ removeAppFromMonitor(event.getApplicationId());
+ }
+ }
+
+}
Added: incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java (added)
+++ incubator/tez/tez-ampool/src/main/java/org/apache/tez/ampool/AMPoolClientRMProxy.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.tez.ampool;
+
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapreduce.v2.app2.lazy.LazyAMConfig;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+
+public class AMPoolClientRMProxy extends AbstractService
+ implements ClientRMProtocol {
+
+ private static final Log LOG = LogFactory.getLog(
+ AMPoolClientRMProxy.class);
+
+ private final AMPoolContext context;
+ private Server server;
+ InetSocketAddress clientBindAddress;
+
+ public AMPoolClientRMProxy(AMPoolContext context) {
+ super(AMPoolClientRMProxy.class.getName());
+ this.context = context;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ clientBindAddress = getBindAddress(conf);
+
+ InetSocketAddress rmAddress =
+ conf.getSocketAddr(YarnConfiguration.RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_ADDRESS,
+ YarnConfiguration.DEFAULT_RM_PORT);
+
+ if (clientBindAddress.equals(rmAddress)) {
+ throw new RuntimeException(
+ "AMPoolService's RM proxy is running on the same address"
+ + " as the main RM"
+ + ", AMPoolServiceBindAddress=" + clientBindAddress.toString()
+ + ", RMBindAddress=" + rmAddress.toString());
+ }
+
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ LOG.info("Starting AMPoolClientRMProxy service");
+ Configuration conf = getConfig();
+ YarnRPC rpc = YarnRPC.create(conf);
+ LOG.info("Starting ClientRMProtocol listener at"
+ + clientBindAddress.toString());
+ this.server =
+ rpc.getServer(ClientRMProtocol.class, this,
+ clientBindAddress,
+ conf, null,
+ conf.getInt(AMPoolConfiguration.RM_PROXY_CLIENT_THREAD_COUNT,
+ AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_THREAD_COUNT));
+
+ this.server.start();
+ clientBindAddress = conf.updateConnectAddr(
+ AMPoolConfiguration.RM_PROXY_CLIENT_ADDRESS,
+ server.getListenerAddress());
+ LOG.info("Started ClientRMProtocol listener at"
+ + clientBindAddress.toString());
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ LOG.info("Stopping AMPoolClientRMProxy service");
+ if (this.server != null) {
+ this.server.stop();
+ }
+ super.stop();
+ }
+
+ InetSocketAddress getBindAddress(Configuration conf) {
+ return conf.getSocketAddr(AMPoolConfiguration.RM_PROXY_CLIENT_ADDRESS,
+ AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_ADDRESS,
+ AMPoolConfiguration.DEFAULT_RM_PROXY_CLIENT_PORT);
+ }
+
+ @Override
+ public CancelDelegationTokenResponse cancelDelegationToken(
+ CancelDelegationTokenRequest request) throws YarnRemoteException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public GetNewApplicationResponse getNewApplication(
+ GetNewApplicationRequest request) throws YarnRemoteException {
+ ApplicationId appId = context.getNewApplicationId();
+ if (appId != null) {
+ LOG.info("Received a getNewApplication request"
+ + ", assignedAppId=" + appId.toString());
+ GetNewApplicationResponse response = Records.newRecord(
+ GetNewApplicationResponse.class);
+ response.setApplicationId(appId);
+ response.setMaximumResourceCapability(context.getMaxResourceCapability());
+ response.setMinimumResourceCapability(context.getMinResourceCapability());
+ return response;
+ }
+ LOG.info("Received a getNewApplication request, proxying to real RM");
+ return context.getRMYarnClient().getNewApplication();
+ }
+
+ @Override
+ public KillApplicationResponse forceKillApplication(
+ KillApplicationRequest request) throws YarnRemoteException {
+ // TODO handle kills for managed AMs
+ context.getRMYarnClient().killApplication(
+ request.getApplicationId());
+ return Records.newRecord(KillApplicationResponse.class);
+ }
+
+ @Override
+ public GetApplicationReportResponse getApplicationReport(
+ GetApplicationReportRequest request) throws YarnRemoteException {
+ GetApplicationReportResponse response = Records.newRecord(
+ GetApplicationReportResponse.class);
+ ApplicationId applicationId = request.getApplicationId();
+ ApplicationReport applicationReport =
+ context.getRMYarnClient().getApplicationReport(applicationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received an application report request"
+ + ", applicationId=" + applicationId);
+ }
+
+ // Handle submitted jobs that have not been picked up by AM
+ long jobPickUpTime = -1;
+ if ((applicationReport != null
+ && applicationReport.getYarnApplicationState() != null
+ && applicationReport.getYarnApplicationState()
+ == YarnApplicationState.RUNNING)
+ && context.isManagedApp(applicationId)) {
+ AMContext amContext = context.getAMContext(applicationId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("AMContext is null? = " + (amContext == null));
+ if (amContext != null) {
+ LOG.debug("AMContext dump: " + amContext.toString());
+ }
+ }
+ if (amContext == null || amContext.getSubmissionContext() == null) {
+ applicationReport.setYarnApplicationState(
+ YarnApplicationState.NEW);
+ } else {
+ // TODO fix
+ // Needs a pingback from the AM to really fix this correctly
+ // 5 second delay is not necessarily a guarantee
+ int pollWaitInterval = getConfig().getInt(
+ LazyAMConfig.POLLING_INTERVAL_SECONDS,
+ LazyAMConfig.DEFAULT_POLLING_INTERVAL_SECONDS) * 5000;
+ if (amContext.getCurrentApplicationAttemptId() == null
+ || amContext.getJobPickUpTime() <= 0
+ || (System.currentTimeMillis() <
+ (amContext.getJobPickUpTime() + pollWaitInterval))) {
+ jobPickUpTime = amContext.getJobPickUpTime();
+ applicationReport.setYarnApplicationState(
+ YarnApplicationState.SUBMITTED);
+ }
+ }
+ LOG.info("Received an application report request"
+ + ", applicationId=" + applicationId
+ + ", applicationState=" + applicationReport.getYarnApplicationState()
+ + ", jobPickupTime=" + jobPickUpTime);
+ }
+ response.setApplicationReport(applicationReport);
+ return response;
+ }
+
+ @Override
+ public GetClusterMetricsResponse getClusterMetrics(
+ GetClusterMetricsRequest request) throws YarnRemoteException {
+ GetClusterMetricsResponse response = Records.newRecord(
+ GetClusterMetricsResponse.class);
+ response.setClusterMetrics(
+ context.getRMYarnClient().getYarnClusterMetrics());
+ return response;
+ }
+
+ @Override
+ public GetAllApplicationsResponse getAllApplications(
+ GetAllApplicationsRequest request) throws YarnRemoteException {
+ GetAllApplicationsResponse response = Records.newRecord(
+ GetAllApplicationsResponse.class);
+ response.setApplicationList(
+ context.getRMYarnClient().getApplicationList());
+ return response;
+ }
+
+ @Override
+ public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+ throws YarnRemoteException {
+ GetClusterNodesResponse response = Records.newRecord(
+ GetClusterNodesResponse.class);
+ response.setNodeReports(context.getRMYarnClient().getNodeReports());
+ return response;
+ }
+
+ @Override
+ public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+ throws YarnRemoteException {
+ GetQueueInfoResponse response = Records.newRecord(
+ GetQueueInfoResponse.class);
+ response.setQueueInfo(context.getRMYarnClient().getQueueInfo(
+ request.getQueueName()));
+ return response;
+ }
+
+ @Override
+ public GetQueueUserAclsInfoResponse getQueueUserAcls(
+ GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
+ GetQueueUserAclsInfoResponse response =
+ Records.newRecord(GetQueueUserAclsInfoResponse.class);
+ response.setUserAclsInfoList(
+ context.getRMYarnClient().getQueueAclsInfo());
+ return response;
+ }
+
+ @Override
+ public GetDelegationTokenResponse getDelegationToken(
+ GetDelegationTokenRequest request) throws YarnRemoteException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RenewDelegationTokenResponse renewDelegationToken(
+ RenewDelegationTokenRequest request) throws YarnRemoteException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SubmitApplicationResponse submitApplication(
+ SubmitApplicationRequest request) throws YarnRemoteException {
+ if (!context.isManagedApp(
+ request.getApplicationSubmissionContext().getApplicationId())) {
+ LOG.info("Received a submitApplication request for non-managed app"
+ + ", proxying to real RM"
+ + ", appId="
+ + request.getApplicationSubmissionContext().getApplicationId());
+ context.getRMYarnClient().submitApplication(
+ request.getApplicationSubmissionContext());
+ return Records.newRecord(SubmitApplicationResponse.class);
+ }
+ LOG.info("Received a submitApplication request for managed app"
+ + ", appId="
+ + request.getApplicationSubmissionContext().getApplicationId());
+ return context.submitApplication(request);
+ }
+
+}